Spark写Parquet源码分析

栏目: 编程工具 · 发布时间: 5年前

内容简介:Dataset中的DataFrameWriter中的可以看到,结果其实就是调用这个save方法,跟踪到后面,会发现最后调用到DataSource这个类里:

Dataset中的 write() 方法:

/**
   * Interface for saving the content of the non-streaming Dataset out into external storage.
   *
   * @group basic
   * @since 1.6.0
   */
  def write: DataFrameWriter[T] = {
    if (isStreaming) {
      logicalPlan.failAnalysis(
        "'write' can not be called on streaming Dataset/DataFrame")
    }
    new DataFrameWriter[T](this)
  }

DataFrameWriter中的 parquet() 方法:

def parquet(path: String): Unit = {
  format("parquet").save(path)
}

可以看到,结果其实就是调用这个save方法,跟踪到后面,会发现最后调用到DataSource这个类里:

/**
 * Writes the given [[DataFrame]] out to this [[DataSource]].
 */
def write(mode: SaveMode, data: DataFrame): Unit = {
  if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
    throw new AnalysisException("Cannot save interval data type into external storage.")
  }
  providingClass.newInstance() match {
    case dataSource: CreatableRelationProvider =>
      dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
    case format: FileFormat =>
      writeInFileFormat(format, mode, data)
    case _ =>
      sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
  }
}

In DataSource

可以看到,写Parquet会在 FileFormat 这个case里, FileFormat 是个接口类,parquet的继承类是 ParquetFileFormat 。我们继续看 writeInFileFormat() 这个方法:

/**
 * Writes the given [[DataFrame]] out in this [[FileFormat]].
 */
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
  ...
  ...
  val plan =
      InsertIntoHadoopFsRelationCommand(
        outputPath = outputPath,
        staticPartitionKeys = Map.empty,
        customPartitionLocations = Map.empty,
        partitionColumns = columns,
        bucketSpec = bucketSpec,
        fileFormat = format,
        refreshFunction = _ => Unit, // No existing table needs to be refreshed.
        options = options,
        query = data.logicalPlan,
        mode = mode,
        catalogTable = catalogTable)
    sparkSession.sessionState.executePlan(plan).toRdd
}

调用到了 InsertIntoHadoopFsRelationCommand 这个类,在其继承的 run() 方法里,有这样一段代码:

FileFormatWriter.write(
        sparkSession = sparkSession,
        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
        fileFormat = fileFormat,
        committer = committer,
        outputSpec = FileFormatWriter.OutputSpec(
          qualifiedOutputPath.toString, customPartitionLocations),
        hadoopConf = hadoopConf,
        partitionColumns = partitionColumns,
        bucketSpec = bucketSpec,
        refreshFunction = refreshFunction,
        options = options)

跳转到这个write方法:

def write(
      sparkSession: SparkSession,
      queryExecution: QueryExecution,
      fileFormat: FileFormat,
      committer: FileCommitProtocol,
      outputSpec: OutputSpec,
      hadoopConf: Configuration,
      partitionColumns: Seq[Attribute],
      bucketSpec: Option[BucketSpec],
      refreshFunction: (Seq[TablePartitionSpec]) => Unit,
      options: Map[String, String]): Unit = {
    ...
    ...
    val outputWriterFactory =
      fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
}

由于这个fileFormat是 ParquetFileFormat 的实例,所以我们直接看 ParquetFileFormat.prepareWrite 方法:

override def prepareWrite(
      sparkSession: SparkSession,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory = {
    new OutputWriterFactory {
      // This OutputWriterFactory instance is deserialized when writing Parquet files on the
      // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
      // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
      // initialized.
      private val parquetLogRedirector = ParquetLogRedirector.INSTANCE

        override def newInstance(
          path: String,
          dataSchema: StructType,
          context: TaskAttemptContext): OutputWriter = {
        new ParquetOutputWriter(path, context)
      }

      override def getFileExtension(context: TaskAttemptContext): String = {
        CodecConfig.from(context).getCodec.getExtension + ".parquet"
      }
    }
}

可以看到,返回的OutputWriterFactory实例是 ParquetOutputWriter ,在 ParquetOutputWriter 里又调用到RecordWriter的实例 ParquetRecordWriter<InternalRow> ,其中的writeSupport类实例是Spark自己写的 ParquetWriteSupport 类,该类主要将Spark的InternalRow类里的字段分别使用Parquet的columnWriter去写。

如何做到的,看 ParquetWriteSupport 类中的 consumeField() 方法:

private def consumeField(field: String, index: Int)(f: => Unit): Unit = {
    recordConsumer.startField(field, index)
    f
    recordConsumer.endField(field, index)
  }

recordConsumer的实例在这里是 MessageColumnIORecordConsumer 类的对象。该类是Parquet的类。

同时,在 ParquetOutputWriter 类里, 的方法有着以下代码:

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
  extends OutputWriter {

  private val recordWriter: RecordWriter[Void, InternalRow] = {
    new ParquetOutputFormat[InternalRow]() {
      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
        new Path(path)
      }
    }.getRecordWriter(context)
  }

  override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")

  override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)

  override def close(): Unit = recordWriter.close(context)
}

可以看到,实际的writer是 ParquetOutputFormat 类,该类也是Parquet的类,从这里我们开始进入到Parquet源代码。

Parquet层面

Parquet的写,最小到page层面,每个压缩也是在page层面。具体做法是,缓存在JVM中,当到达一个阈值后,flush到File中。接下来看看如何在代码中实现。

In ParquetOutputFormat

上文讲到Parquet-Hadoop模块的 ParquetOutputFormat 类。查看该类的getRecordWriter类代码:

public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)
        throws IOException, InterruptedException {
    final WriteSupport<T> writeSupport = getWriteSupport(conf);

    CodecFactory codecFactory = new CodecFactory(conf);
    long blockSize = getLongBlockSize(conf);
    if (INFO) LOG.info("Parquet block size to " + blockSize);
    int pageSize = getPageSize(conf);
    if (INFO) LOG.info("Parquet page size to " + pageSize);
    int dictionaryPageSize = getDictionaryPageSize(conf);
    if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
    boolean enableDictionary = getEnableDictionary(conf);
    if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
    boolean validating = getValidation(conf);
    if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
    WriterVersion writerVersion = getWriterVersion(conf);
    if (INFO) LOG.info("Writer version is: " + writerVersion);
    int maxPaddingSize = getMaxPaddingSize(conf);
    if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");

    WriteContext init = writeSupport.init(conf);
    ParquetFileWriter w = new ParquetFileWriter(
        conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
    w.start();

    float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
        MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
    long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
        MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
    if (memoryManager == null) {
      memoryManager = new MemoryManager(maxLoad, minAllocation);
    } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
      LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
          "be reset by the new value: " + maxLoad);
    }

    return new ParquetRecordWriter<T>(
        w,
        writeSupport,
        init.getSchema(),
        init.getExtraMetaData(),
        blockSize, pageSize,
        codecFactory.getCompressor(codec, pageSize),
        dictionaryPageSize,
        enableDictionary,
        validating,
        writerVersion,
        memoryManager);
  }

可以看到,从conf里获取里blockSize(其实就是rowGroupSize,Parquet代码里把row group称作parquet block),pageSize等参数。然后初始化ParquetFileWriter,最后返回ParquetRecordWriter。其中的writeSupport就是spark自己写的 ParquetWriteSupport

In ParquetRecordWriter

public ParquetRecordWriter(
      ParquetFileWriter w,
      WriteSupport<T> writeSupport,
      MessageType schema,
      Map<String, String> extraMetaData,
      long blockSize, int pageSize,
      BytesCompressor compressor,
      int dictionaryPageSize,
      boolean enableDictionary,
      boolean validating,
      WriterVersion writerVersion,
      MemoryManager memoryManager) {
    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
        validating, writerVersion);
    this.memoryManager = checkNotNull(memoryManager, "memoryManager");
    memoryManager.addWriter(internalWriter, blockSize);
}
@Override
public void write(Void key, T value) throws IOException, InterruptedException {
    internalWriter.write(value);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
    internalWriter.close();
    if (memoryManager != null) {
      memoryManager.removeWriter(internalWriter);
    }
}

可以看到,在该类里new了一个InternalParquetRecordWriter的对象。internalWriter和writeSupport这也是MR的一贯写法。

In InternalParquetRecordWriter

public InternalParquetRecordWriter(
    ParquetFileWriter parquetFileWriter,
    WriteSupport<T> writeSupport,
    MessageType schema,
    Map<String, String> extraMetaData,
    long rowGroupSize,
    int pageSize,
    BytesCompressor compressor,
    int dictionaryPageSize,
    boolean enableDictionary,
    boolean validating,
    WriterVersion writerVersion) {
  this.parquetFileWriter = parquetFileWriter;
  this.writeSupport = checkNotNull(writeSupport, "writeSupport");
  this.schema = schema;
  this.extraMetaData = extraMetaData;
  this.rowGroupSize = rowGroupSize;
  this.rowGroupSizeThreshold = rowGroupSize;
  this.nextRowGroupSize = rowGroupSizeThreshold;
  this.pageSize = pageSize;
  this.compressor = compressor;
  this.validating = validating;
  this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
  initStore();
}

private void initStore() {
  pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
  columnStore = parquetProperties.newColumnWriteStore(
      schema,
      pageStore,
      pageSize);
  MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
  writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
}

在初始化的时候,会初始化三个对象:pageStore,columnStore,MessageColumnIO。这三个对象是干嘛用的呢?

首先是pageStore,其是 ColumnChunkPageWriteStore 的实例,是pageWriter和column的集合,看其主要成员以及构造方法:

private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
private final MessageType schema;

public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {
    this.schema = schema;
    for (ColumnDescriptor path : schema.getColumns()) {
      writers.put(path,  new ColumnChunkPageWriter(path, compressor, pageSize));
    }
}

可以看到,writers这个map是其最核心的成员,其key是每个列,value是实际的pageWriter。这个 ColumnChunkPageWriter 是内部类,它继承了PageWriter接口。在后面需要讲到的ColumnWriter中,实际使用的pageWriter对象就是 ColumnChunkPageWriter 对象。

然后是columnStore,通过查看 parquetProperties.newColumnWriteStore() 方法:

public ColumnWriteStore newColumnWriteStore(
      MessageType schema,
      PageWriteStore pageStore,
      int pageSize) {
    switch (writerVersion) {
    case PARQUET_1_0:
      return new ColumnWriteStoreV1(
          pageStore,
          pageSize,
          dictionaryPageSizeThreshold,
          enableDictionary, writerVersion);
    case PARQUET_2_0:
      return new ColumnWriteStoreV2(
          schema,
          pageStore,
          pageSize,
          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
    default:
      throw new IllegalArgumentException("unknown version " + writerVersion);
    }
}

Spark采用的是V1.0的writer,所以case会走入到 ColumnWriteStoreV1 ,该对象里传入了刚刚讲到的pageStore。那我们来看看 ColumnWriteStoreV1 里的内容:

private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();

public ColumnWriter getColumnWriter(ColumnDescriptor path) {
  ColumnWriterV1 column = columns.get(path);
  if (column == null) {
    column = newMemColumn(path);
    columns.put(path, column);
  }
  return column;
}

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
  PageWriter pageWriter = pageWriteStore.getPageWriter(path);
  return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
}

可以看到,与pageStore类似,columnStore同样也使用了一个map,key是列,value是 ColumnWriterV1 的对象,获取一个columnWriter时,先去map里找,如果没有,就调用 newMemColumn() 方法,从pageStore的map里找到对应的pageWriter,赋给new的ColumnWriterV1,并且加入到columnStore的map中。

最后是MessageColumnIO,它的核心成员是 List<PrimitiveColumnIO> leaves ,会保存每列的基本信息,如列名等。

writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore)); 这句非常关键, columnIO.getRecordWriter(columnStore) 会返回一个 MessageColumnIORecordConsumer 类的对象,该类在上文Spark部分最后有提到,其有一个成员是 ColumnWriter[] ,也就是说该类会保存所有的实际的writer,实际使用也就是我们上文提到的 ColumnWriterV1 们。

In RecordConsumer & ColumnWriter

回到 ParquetRecordWriter 来,其overwrite的写方法,就是调用 InternalParquetRecordWriter 的写方法,然后 InternalParquetRecordWriter 的写方法又调用writeSupport的写方法,在Spark的 ParquetWriteSupport 对象中,会发现最后调用的是 recordConsumeraddX 方法(如 addBoolean() , addInteger() ),而 recordConsumer 实际对象是 MessageColumnIORecordConsumer ,那么我们就先看一下 addInteger()

@Override
public void addInteger(int value) {
  if (DEBUG) log("addInt(" + value + ")");
  emptyField = false;
  getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());

  setRepetitionLevel();
  if (DEBUG) printState();
}

可以看到,在获取实际的ColumnWriter后进行写,我们知道这里用的是 ColumnWriterV1 ,遂进入到该类查看:

@Override
public void write(int value, int repetitionLevel, int definitionLevel) {
  if (DEBUG) log(value, repetitionLevel, definitionLevel);
  repetitionLevelColumn.writeInteger(repetitionLevel);
  definitionLeveolumn.writeInteger(definitionLevel);
  dataColumn.writeInteger(value);
  updateStatistics(value);
  accountForValueWritten();
}

写入到都是缓存,关键在最后一句, accountForValueWritten() ,查看该方法:

/**
 * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
 *
 * We measure the memory used when we reach the mid point toward our estimated count.
 * We then update the estimate and flush the page if we reached the threshold.
 *
 * That way we check the memory size log2(n) times.
 *
 */
private void accountForValueWritten() {
  ++ valueCount;
  if (valueCount > valueCountForNextSizeCheck) {
    // not checking the memory used for every value
    long memSize = repetitionLevelColumn.getBufferedSize()
        + definitionLevelColumn.getBufferedSize()
        + dataColumn.getBufferedSize();
    if (memSize > pageSizeThreshold) {
      // we will write the current page and check again the size at the predicted middle of next page
      valueCountForNextSizeCheck = valueCount / 2;
      writePage();
    } else {
      // not reached the threshold, will check again midway
      valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
    }
  }
}

这个方法实际就是在写入每个值以后判断是否到达阈值然后调用 writePage() ,查看该方法:

private void writePage() {
  if (DEBUG) LOG.debug("write page");
  try {
    pageWriter.writePage(
        concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
        valueCount,
        statistics,
        repetitionLevelColumn.getEncoding(),
        definitionLevelColumn.getEncoding(),
        dataColumn.getEncoding());
  } catch (IOException e) {
    throw new ParquetEncodingException("could not write page for " + path, e);
  }
  repetitionLevelColumn.reset();
  definitionLevelColumn.reset();
  dataColumn.reset();
  valueCount = 0;
  resetStatistics();
}

可以看到是调用pageWriter来写page,上文已经讲到过,实际使用的是 ColumnChunkPageWriter ,查看其 writePage()

@Override
public void writePage(BytesInput bytes,
                      int valueCount,
                      Statistics statistics,
                      Encoding rlEncoding,
                      Encoding dlEncoding,
                      Encoding valuesEncoding) throws IOException {
  long uncompressedSize = bytes.size();
  if (uncompressedSize > Integer.MAX_VALUE) {
    throw new ParquetEncodingException(
        "Cannot write page larger than Integer.MAX_VALUE bytes: " +
        uncompressedSize);
  }
  BytesInput compressedBytes = compressor.compress(bytes);
  long compressedSize = compressedBytes.size();
  if (compressedSize > Integer.MAX_VALUE) {
    throw new ParquetEncodingException(
        "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
        + compressedSize);
  }
  tempOutputStream.reset();
  parquetMetadataConverter.writeDataPageHeader(
      (int)uncompressedSize,
      (int)compressedSize,
      valueCount,
      statistics,
      rlEncoding,
      dlEncoding,
      valuesEncoding,
      tempOutputStream);
  this.uncompressedLength += uncompressedSize;
  this.compressedLength += compressedSize;
  this.totalValueCount += valueCount;
  this.pageCount += 1;
  this.totalStatistics.mergeStatistics(statistics);
  // by concatenating before collecting instead of collecting twice,
  // we only allocate one buffer to copy into instead of multiple.
  buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
  encodings.add(rlEncoding);
  encodings.add(dlEncoding);
  encodings.add(valuesEncoding);
}

这里可以清楚看到,在压缩page的byte以后,先写page头,再写page内容。这个时候还是在缓存里的。

In ParquetFileWriter

回到 InternalParquetRecordWriter ,该 write() 方法与pageWriter类似,最后一句都有一个check的方法:

public void write(T value) throws IOException, InterruptedException {
  writeSupport.write(value);
  ++ recordCount;
  checkBlockSizeReached();
}

不同的是,该check是check是否要写入到文件中:

private void checkBlockSizeReached() throws IOException {
  if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
    long memSize = columnStore.getBufferedSize();
    long recordSize = memSize / recordCount;
    // flush the row group if it is within ~2 records of the limit
    // it is much better to be slightly under size than to be over at all
    if (memSize > (nextRowGroupSize - 2 * recordSize)) {
      LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, nextRowGroupSize, recordCount));
      flushRowGroupToStore();
      initStore();
      recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
      this.lastRowGroupEndPos = parquetFileWriter.getPos();
    } else {
      recordCountForNextMemCheck = min(
          max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
          recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
          );
      if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
    }
  }
}

private void flushRowGroupToStore()
    throws IOException {
  LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
  if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
    LOG.warn("Too much memory used: " + columnStore.memUsageString());
  }

  if (recordCount > 0) {
    parquetFileWriter.startBlock(recordCount);
    columnStore.flush();
    pageStore.flushToFileWriter(parquetFileWriter);
    recordCount = 0;
    parquetFileWriter.endBlock();
    this.nextRowGroupSize = Math.min(
        parquetFileWriter.getNextRowGroupSize(),
        rowGroupSizeThreshold);
  }

  columnStore = null;
  pageStore = null;
}

可以看到,调用了 ParquetFileWriter 写到文件中:

/**
 * writes a number of pages at once
 * @param bytes bytes to be written including page headers
 * @param uncompressedTotalPageSize total uncompressed size (without page headers)
 * @param compressedTotalPageSize total compressed size (without page headers)
 * @throws IOException
 */
 void writeDataPages(BytesInput bytes,
                     long uncompressedTotalPageSize,
                     long compressedTotalPageSize,
                     Statistics totalStats,
                     List<Encoding> encodings) throws IOException {
  state = state.write();
  if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
  long headersSize = bytes.size() - compressedTotalPageSize;
  this.uncompressedLength += uncompressedTotalPageSize + headersSize;
  this.compressedLength += compressedTotalPageSize + headersSize;
  if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
  bytes.writeAllTo(out);
  currentEncodings.addAll(encodings);
  currentStatistics = totalStats;
}

close

当所有的rowgroup中的page都写完,最后要写footer部分,这时候我们看到之前 InternalParquetRecordWriter 中的close方法:

public void close() throws IOException, InterruptedException {
  flushRowGroupToStore();
  parquetFileWriter.end(extraMetaData);
}

会调用 parquetFileWriterend() 方法进行最终的footer写,这里就不再赘述。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ruby on Rails社区网站开发

Ruby on Rails社区网站开发

布拉德伯纳 / 柳靖 / 2008-10 / 55.00元

《Ruby on Rails社区网站开发》全面探讨创建完整社区网站的开发过程。首先介绍开发一个内容简单的管理系统,之后逐渐添加新特性,以创建更完整的、使用Ruby on Rails 的Web 2.0 社区网站。还给出了开发和测试中的一些建议和提示,同时指导如何使网站更生动以及维护得更好。《Ruby on Rails社区网站开发》也探讨了如何与Flickr 、Google Maps 等其他平台集成,......一起来看看 《Ruby on Rails社区网站开发》 这本书的介绍吧!

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具