内容简介:Dataset中的DataFrameWriter中的可以看到,结果其实就是调用这个save方法,跟踪到后面,会发现最后调用到DataSource这个类里:
Dataset中的 write()
方法:
/** * Interface for saving the content of the non-streaming Dataset out into external storage. * * @group basic * @since 1.6.0 */ def write: DataFrameWriter[T] = { if (isStreaming) { logicalPlan.failAnalysis( "'write' can not be called on streaming Dataset/DataFrame") } new DataFrameWriter[T](this) }
DataFrameWriter中的 parquet()
方法:
def parquet(path: String): Unit = { format("parquet").save(path) }
可以看到,结果其实就是调用这个save方法,跟踪到后面,会发现最后调用到DataSource这个类里:
/** * Writes the given [[DataFrame]] out to this [[DataSource]]. */ def write(mode: SaveMode, data: DataFrame): Unit = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } providingClass.newInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) case format: FileFormat => writeInFileFormat(format, mode, data) case _ => sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } }
In DataSource
可以看到,写Parquet会在 FileFormat
这个case里, FileFormat
是个接口类,parquet的继承类是 ParquetFileFormat
。我们继续看 writeInFileFormat()
这个方法:
/** * Writes the given [[DataFrame]] out in this [[FileFormat]]. */ private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { ... ... val plan = InsertIntoHadoopFsRelationCommand( outputPath = outputPath, staticPartitionKeys = Map.empty, customPartitionLocations = Map.empty, partitionColumns = columns, bucketSpec = bucketSpec, fileFormat = format, refreshFunction = _ => Unit, // No existing table needs to be refreshed. options = options, query = data.logicalPlan, mode = mode, catalogTable = catalogTable) sparkSession.sessionState.executePlan(plan).toRdd }
调用到了 InsertIntoHadoopFsRelationCommand
这个类,在其继承的 run()
方法里,有这样一段代码:
FileFormatWriter.write( sparkSession = sparkSession, queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( qualifiedOutputPath.toString, customPartitionLocations), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, refreshFunction = refreshFunction, options = options)
跳转到这个write方法:
def write( sparkSession: SparkSession, queryExecution: QueryExecution, fileFormat: FileFormat, committer: FileCommitProtocol, outputSpec: OutputSpec, hadoopConf: Configuration, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], refreshFunction: (Seq[TablePartitionSpec]) => Unit, options: Map[String, String]): Unit = { ... ... val outputWriterFactory = fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType) }
由于这个fileFormat是 ParquetFileFormat
的实例,所以我们直接看 ParquetFileFormat.prepareWrite
方法:
override def prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { new OutputWriterFactory { // This OutputWriterFactory instance is deserialized when writing Parquet files on the // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is // initialized. private val parquetLogRedirector = ParquetLogRedirector.INSTANCE override def newInstance( path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { new ParquetOutputWriter(path, context) } override def getFileExtension(context: TaskAttemptContext): String = { CodecConfig.from(context).getCodec.getExtension + ".parquet" } } }
可以看到,返回的OutputWriterFactory实例是 ParquetOutputWriter
,在 ParquetOutputWriter
里又调用到RecordWriter的实例 ParquetRecordWriter<InternalRow>
,其中的writeSupport类实例是Spark自己写的 ParquetWriteSupport
类,该类主要将Spark的InternalRow类里的字段分别使用Parquet的columnWriter去写。
如何做到的,看 ParquetWriteSupport
类中的 consumeField()
方法:
private def consumeField(field: String, index: Int)(f: => Unit): Unit = { recordConsumer.startField(field, index) f recordConsumer.endField(field, index) }
recordConsumer的实例在这里是 MessageColumnIORecordConsumer
类的对象。该类是Parquet的类。
同时,在 ParquetOutputWriter
类里, 写
的方法有着以下代码:
// NOTE: This class is instantiated and used on executor side only, no need to be serializable. private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { private val recordWriter: RecordWriter[Void, InternalRow] = { new ParquetOutputFormat[InternalRow]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { new Path(path) } }.getRecordWriter(context) } override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) override def close(): Unit = recordWriter.close(context) }
可以看到,实际的writer是 ParquetOutputFormat
类,该类也是Parquet的类,从这里我们开始进入到Parquet源代码。
Parquet层面
Parquet的写,最小到page层面,每个压缩也是在page层面。具体做法是,缓存在JVM中,当到达一个阈值后,flush到File中。接下来看看如何在代码中实现。
In ParquetOutputFormat
上文讲到Parquet-Hadoop模块的 ParquetOutputFormat
类。查看该类的getRecordWriter类代码:
public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec) throws IOException, InterruptedException { final WriteSupport<T> writeSupport = getWriteSupport(conf); CodecFactory codecFactory = new CodecFactory(conf); long blockSize = getLongBlockSize(conf); if (INFO) LOG.info("Parquet block size to " + blockSize); int pageSize = getPageSize(conf); if (INFO) LOG.info("Parquet page size to " + pageSize); int dictionaryPageSize = getDictionaryPageSize(conf); if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize); boolean enableDictionary = getEnableDictionary(conf); if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off")); boolean validating = getValidation(conf); if (INFO) LOG.info("Validation is " + (validating ? "on" : "off")); WriterVersion writerVersion = getWriterVersion(conf); if (INFO) LOG.info("Writer version is: " + writerVersion); int maxPaddingSize = getMaxPaddingSize(conf); if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes"); WriteContext init = writeSupport.init(conf); ParquetFileWriter w = new ParquetFileWriter( conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize); w.start(); float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, MemoryManager.DEFAULT_MEMORY_POOL_RATIO); long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION, MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION); if (memoryManager == null) { memoryManager = new MemoryManager(maxLoad, minAllocation); } else if (memoryManager.getMemoryPoolRatio() != maxLoad) { LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " + "be reset by the new value: " + maxLoad); } return new ParquetRecordWriter<T>( w, writeSupport, init.getSchema(), init.getExtraMetaData(), blockSize, pageSize, codecFactory.getCompressor(codec, pageSize), dictionaryPageSize, enableDictionary, validating, writerVersion, memoryManager); }
可以看到,从conf里获取里blockSize(其实就是rowGroupSize,Parquet代码里把row group称作parquet block),pageSize等参数。然后初始化ParquetFileWriter,最后返回ParquetRecordWriter。其中的writeSupport就是spark自己写的 ParquetWriteSupport
。
In ParquetRecordWriter
public ParquetRecordWriter( ParquetFileWriter w, WriteSupport<T> writeSupport, MessageType schema, Map<String, String> extraMetaData, long blockSize, int pageSize, BytesCompressor compressor, int dictionaryPageSize, boolean enableDictionary, boolean validating, WriterVersion writerVersion, MemoryManager memoryManager) { internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema, extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary, validating, writerVersion); this.memoryManager = checkNotNull(memoryManager, "memoryManager"); memoryManager.addWriter(internalWriter, blockSize); } @Override public void write(Void key, T value) throws IOException, InterruptedException { internalWriter.write(value); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { internalWriter.close(); if (memoryManager != null) { memoryManager.removeWriter(internalWriter); } }
可以看到,在该类里new了一个InternalParquetRecordWriter的对象。internalWriter和writeSupport这也是MR的一贯写法。
In InternalParquetRecordWriter
public InternalParquetRecordWriter( ParquetFileWriter parquetFileWriter, WriteSupport<T> writeSupport, MessageType schema, Map<String, String> extraMetaData, long rowGroupSize, int pageSize, BytesCompressor compressor, int dictionaryPageSize, boolean enableDictionary, boolean validating, WriterVersion writerVersion) { this.parquetFileWriter = parquetFileWriter; this.writeSupport = checkNotNull(writeSupport, "writeSupport"); this.schema = schema; this.extraMetaData = extraMetaData; this.rowGroupSize = rowGroupSize; this.rowGroupSizeThreshold = rowGroupSize; this.nextRowGroupSize = rowGroupSizeThreshold; this.pageSize = pageSize; this.compressor = compressor; this.validating = validating; this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary); initStore(); } private void initStore() { pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize); columnStore = parquetProperties.newColumnWriteStore( schema, pageStore, pageSize); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore)); }
在初始化的时候,会初始化三个对象:pageStore,columnStore,MessageColumnIO。这三个对象是干嘛用的呢?
首先是pageStore,其是 ColumnChunkPageWriteStore
的实例,是pageWriter和column的集合,看其主要成员以及构造方法:
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); private final MessageType schema; public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) { this.schema = schema; for (ColumnDescriptor path : schema.getColumns()) { writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize)); } }
可以看到,writers这个map是其最核心的成员,其key是每个列,value是实际的pageWriter。这个 ColumnChunkPageWriter
是内部类,它继承了PageWriter接口。在后面需要讲到的ColumnWriter中,实际使用的pageWriter对象就是 ColumnChunkPageWriter
对象。
然后是columnStore,通过查看 parquetProperties.newColumnWriteStore()
方法:
public ColumnWriteStore newColumnWriteStore( MessageType schema, PageWriteStore pageStore, int pageSize) { switch (writerVersion) { case PARQUET_1_0: return new ColumnWriteStoreV1( pageStore, pageSize, dictionaryPageSizeThreshold, enableDictionary, writerVersion); case PARQUET_2_0: return new ColumnWriteStoreV2( schema, pageStore, pageSize, new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary)); default: throw new IllegalArgumentException("unknown version " + writerVersion); } }
Spark采用的是V1.0的writer,所以case会走入到 ColumnWriteStoreV1
,该对象里传入了刚刚讲到的pageStore。那我们来看看 ColumnWriteStoreV1
里的内容:
private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>(); public ColumnWriter getColumnWriter(ColumnDescriptor path) { ColumnWriterV1 column = columns.get(path); if (column == null) { column = newMemColumn(path); columns.put(path, column); } return column; } private ColumnWriterV1 newMemColumn(ColumnDescriptor path) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion); }
可以看到,与pageStore类似,columnStore同样也使用了一个map,key是列,value是 ColumnWriterV1
的对象,获取一个columnWriter时,先去map里找,如果没有,就调用 newMemColumn()
方法,从pageStore的map里找到对应的pageWriter,赋给new的ColumnWriterV1,并且加入到columnStore的map中。
最后是MessageColumnIO,它的核心成员是 List<PrimitiveColumnIO> leaves
,会保存每列的基本信息,如列名等。
writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
这句非常关键, columnIO.getRecordWriter(columnStore)
会返回一个 MessageColumnIORecordConsumer
类的对象,该类在上文Spark部分最后有提到,其有一个成员是 ColumnWriter[]
,也就是说该类会保存所有的实际的writer,实际使用也就是我们上文提到的 ColumnWriterV1
们。
In RecordConsumer & ColumnWriter
回到 ParquetRecordWriter
来,其overwrite的写方法,就是调用 InternalParquetRecordWriter
的写方法,然后 InternalParquetRecordWriter
的写方法又调用writeSupport的写方法,在Spark的 ParquetWriteSupport
对象中,会发现最后调用的是 recordConsumer
的 addX
方法(如 addBoolean()
, addInteger()
),而 recordConsumer
实际对象是 MessageColumnIORecordConsumer
,那么我们就先看一下 addInteger()
:
@Override public void addInteger(int value) { if (DEBUG) log("addInt(" + value + ")"); emptyField = false; getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); setRepetitionLevel(); if (DEBUG) printState(); }
可以看到,在获取实际的ColumnWriter后进行写,我们知道这里用的是 ColumnWriterV1
,遂进入到该类查看:
@Override public void write(int value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); repetitionLevelColumn.writeInteger(repetitionLevel); definitionLeveolumn.writeInteger(definitionLevel); dataColumn.writeInteger(value); updateStatistics(value); accountForValueWritten(); }
写入到都是缓存,关键在最后一句, accountForValueWritten()
,查看该方法:
/** * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold. * * We measure the memory used when we reach the mid point toward our estimated count. * We then update the estimate and flush the page if we reached the threshold. * * That way we check the memory size log2(n) times. * */ private void accountForValueWritten() { ++ valueCount; if (valueCount > valueCountForNextSizeCheck) { // not checking the memory used for every value long memSize = repetitionLevelColumn.getBufferedSize() + definitionLevelColumn.getBufferedSize() + dataColumn.getBufferedSize(); if (memSize > pageSizeThreshold) { // we will write the current page and check again the size at the predicted middle of next page valueCountForNextSizeCheck = valueCount / 2; writePage(); } else { // not reached the threshold, will check again midway valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1; } } }
这个方法实际就是在写入每个值以后判断是否到达阈值然后调用 writePage()
,查看该方法:
private void writePage() { if (DEBUG) LOG.debug("write page"); try { pageWriter.writePage( concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()), valueCount, statistics, repetitionLevelColumn.getEncoding(), definitionLevelColumn.getEncoding(), dataColumn.getEncoding()); } catch (IOException e) { throw new ParquetEncodingException("could not write page for " + path, e); } repetitionLevelColumn.reset(); definitionLevelColumn.reset(); dataColumn.reset(); valueCount = 0; resetStatistics(); }
可以看到是调用pageWriter来写page,上文已经讲到过,实际使用的是 ColumnChunkPageWriter
,查看其 writePage()
:
@Override public void writePage(BytesInput bytes, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException { long uncompressedSize = bytes.size(); if (uncompressedSize > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write page larger than Integer.MAX_VALUE bytes: " + uncompressedSize); } BytesInput compressedBytes = compressor.compress(bytes); long compressedSize = compressedBytes.size(); if (compressedSize > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize); } tempOutputStream.reset(); parquetMetadataConverter.writeDataPageHeader( (int)uncompressedSize, (int)compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding, tempOutputStream); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); // by concatenating before collecting instead of collecting twice, // we only allocate one buffer to copy into instead of multiple. buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes)); encodings.add(rlEncoding); encodings.add(dlEncoding); encodings.add(valuesEncoding); }
这里可以清楚看到,在压缩page的byte以后,先写page头,再写page内容。这个时候还是在缓存里的。
In ParquetFileWriter
回到 InternalParquetRecordWriter
,该 write()
方法与pageWriter类似,最后一句都有一个check的方法:
public void write(T value) throws IOException, InterruptedException { writeSupport.write(value); ++ recordCount; checkBlockSizeReached(); }
不同的是,该check是check是否要写入到文件中:
private void checkBlockSizeReached() throws IOException { if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. long memSize = columnStore.getBufferedSize(); long recordSize = memSize / recordCount; // flush the row group if it is within ~2 records of the limit // it is much better to be slightly under size than to be over at all if (memSize > (nextRowGroupSize - 2 * recordSize)) { LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, nextRowGroupSize, recordCount)); flushRowGroupToStore(); initStore(); recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); this.lastRowGroupEndPos = parquetFileWriter.getPos(); } else { recordCountForNextMemCheck = min( max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead ); if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); } } } private void flushRowGroupToStore() throws IOException { LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize())); if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) { LOG.warn("Too much memory used: " + columnStore.memUsageString()); } if (recordCount > 0) { parquetFileWriter.startBlock(recordCount); columnStore.flush(); pageStore.flushToFileWriter(parquetFileWriter); recordCount = 0; parquetFileWriter.endBlock(); this.nextRowGroupSize = Math.min( parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold); } columnStore = null; pageStore = null; }
可以看到,调用了 ParquetFileWriter
写到文件中:
/** * writes a number of pages at once * @param bytes bytes to be written including page headers * @param uncompressedTotalPageSize total uncompressed size (without page headers) * @param compressedTotalPageSize total compressed size (without page headers) * @throws IOException */ void writeDataPages(BytesInput bytes, long uncompressedTotalPageSize, long compressedTotalPageSize, Statistics totalStats, List<Encoding> encodings) throws IOException { state = state.write(); if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); long headersSize = bytes.size() - compressedTotalPageSize; this.uncompressedLength += uncompressedTotalPageSize + headersSize; this.compressedLength += compressedTotalPageSize + headersSize; if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); bytes.writeAllTo(out); currentEncodings.addAll(encodings); currentStatistics = totalStats; }
close
当所有的rowgroup中的page都写完,最后要写footer部分,这时候我们看到之前 InternalParquetRecordWriter
中的close方法:
public void close() throws IOException, InterruptedException { flushRowGroupToStore(); parquetFileWriter.end(extraMetaData); }
会调用 parquetFileWriter
的 end()
方法进行最终的footer写,这里就不再赘述。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Ruby on Rails社区网站开发
布拉德伯纳 / 柳靖 / 2008-10 / 55.00元
《Ruby on Rails社区网站开发》全面探讨创建完整社区网站的开发过程。首先介绍开发一个内容简单的管理系统,之后逐渐添加新特性,以创建更完整的、使用Ruby on Rails 的Web 2.0 社区网站。还给出了开发和测试中的一些建议和提示,同时指导如何使网站更生动以及维护得更好。《Ruby on Rails社区网站开发》也探讨了如何与Flickr 、Google Maps 等其他平台集成,......一起来看看 《Ruby on Rails社区网站开发》 这本书的介绍吧!
RGB HSV 转换
RGB HSV 互转工具
RGB CMYK 转换工具
RGB CMYK 互转工具