内容简介:Spark Shuffle过程分析:Map阶段处理流程
作者:时延军
默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算。我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示:
1 // Let the user specify short names for shuffle managers 2 val shortShuffleMgrNames = Map( 3 "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 4 "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 5 val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") 6 val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 7 val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
如果需要修改ShuffleManager实现,则只需要修改配置项spark.shuffle.manager即可,默认支持sort和 tungsten-sort,可以指定自己实现的ShuffleManager类。
因为Shuffle过程中需要将Map结果数据输出到文件,所以需要通过注册一个ShuffleHandle来获取到一个ShuffleWriter对象,通过它来控制Map阶段记录数据输出的行为。其中,ShuffleHandle包含了如下基本信息:
- shuffleId:标识Shuffle过程的唯一ID
- numMaps:RDD对应的Partitioner指定的Partition的个数,也就是ShuffleMapTask输出的Partition个数
- dependency:RDD对应的依赖ShuffleDependency
下面我们看下,在SortShuffleManager中是如何注册Shuffle的,代码如下所示:
01 override def registerShuffle[K, V, C]( 02 shuffleId: Int, 03 numMaps: Int, 04 dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { 05 if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { 06 new BypassMergeSortShuffleHandle[K, V]( 07 shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 08 } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { 09 new SerializedShuffleHandle[K, V]( 10 shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 11 } else { 12 new BaseShuffleHandle(shuffleId, numMaps, dependency) 13 } 14 }
上面代码中,对应如下3种ShuffleHandle可以选择,说明如下:
- BypassMergeSortShuffleHandle
如果dependency不需要进行Map Side Combine,并且RDD对应的ShuffleDependency中的Partitioner设置的Partition的数量(这个不要和parent RDD的Partition个数混淆,Partitioner指定了map处理结果的Partition个数,每个Partition数据会在Shuffle过程中全部被拉取而拷贝到下游的某个Executor端)小于等于配置参数spark.shuffle.sort.bypassMergeThreshold的值,则会注册BypassMergeSortShuffleHandle。默认情况下,spark.shuffle.sort.bypassMergeThreshold的取值是200,这种情况下会直接将对RDD的 map处理结果的各个Partition数据写入文件,并最后做一个合并处理。
- SerializedShuffleHandle
如果ShuffleDependency中的Serializer,允许对将要输出数据对象进行 排序 后,再执行序列化写入到文件,则会选择创建一个SerializedShuffleHandle。
- BaseShuffleHandle
除了上面两种ShuffleHandle以后,其他情况都会创建一个BaseShuffleHandle对象,它会以反序列化的格式处理Shuffle输出数据。
Map阶段处理流程分析
Map阶段RDD的计算,对应ShuffleMapTask这个实现类,它最终会在每个Executor上启动运行,每个ShuffleMapTask处理RDD的一个Partition的数据。这个过程的核心处理逻辑,代码如下所示:
1 val manager = SparkEnv.get.shuffleManager 2 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 3 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
上面代码中,在调用rdd的iterator()方法时,会根据RDD实现类的compute方法指定的处理逻辑对数据进行处理,当然,如果该Partition对应的数据已经处理过并存储在MemoryStore或DiskStore,直接通过BlockManager获取到对应的Block数据,而无需每次需要时重新计算。然后,write()方法会将已经处理过的Partition数据输出到磁盘文件。
在Spark Shuffle过程中,每个ShuffleMapTask会通过配置的ShuffleManager实现类对应的ShuffleManager对象(实际上是在SparkEnv中创建),根据已经注册的ShuffleHandle,获取到对应的ShuffleWriter对象,然后通过ShuffleWriter对象将Partition数据写入内存或文件。所以,接下来我们可能关心每一种ShuffleHandle对应的ShuffleWriter的行为,可以看到SortShuffleManager中获取到ShuffleWriter的实现代码,如下所示:
01 /** Get a writer for a given partition. Called on executors by map tasks. */ 02 override def getWriter[K, V]( 03 handle: ShuffleHandle, 04 mapId: Int, 05 context: TaskContext): ShuffleWriter[K, V] = { 06 numMapsForShuffle.putIfAbsent( 07 handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) 08 val env = SparkEnv.get 09 handle match { 10 case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => 11 new UnsafeShuffleWriter( 12 env.blockManager, 13 shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 14 context.taskMemoryManager(), 15 unsafeShuffleHandle, 16 mapId, 17 context, 18 env.conf) 19 case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 20 new BypassMergeSortShuffleWriter( 21 env.blockManager, 22 shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 23 bypassMergeSortHandle, 24 mapId, 25 context, 26 env.conf) 27 case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 28 new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 29 } 30 }
我们以最简单的SortShuffleWriter为例进行分析,在SortShuffleManager可以通过getWriter()方法创建一个SortShuffleWriter对象,然后在ShuffleMapTask中调用SortShuffleWriter对象的write()方法处理Map输出的记录数据,write()方法的处理代码,如下所示:
01 /** Write a bunch of records to this task's output */ 02 override def write(records: Iterator[Product2[K, V]]): Unit = { 03 sorter = if (dep.mapSideCombine) { 04 require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") 05 new ExternalSorter[K, V, C]( 06 context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) 07 } else { 08 // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 09 // care whether the keys get sorted in each partition; that will be done on the reduce side 10 // if the operation being run is sortByKey. 11 new ExternalSorter[K, V, V]( 12 context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 13 } 14 sorter.insertAll(records) 15 16 // Don't bother including the time to open the merged output file in the shuffle write time, 17 // because it just opens a single file, so is typically too fast to measure accurately 18 // (see SPARK-3570). 19 val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) 20 val tmp = Utils.tempFileWith(output) 21 val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 22 val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 23 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 24 mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 25 }
从SortShuffleWriter类中的write()方法可以看到,最终调用了ExeternalSorter的insertAll()方法,实现了Map端RDD某个Partition数据处理并输出到内存或磁盘文件,这也是处理Map阶段输出记录数据最核心、最复杂的过程。我们将其分为两个阶段进行分析:第一阶段是,ExeternalSorter的insertAll()方法处理过程,将记录数据Spill到磁盘文件;第二阶段是,执行完insertAll()方法之后的处理逻辑,创建Shuffle Block数据文件及其索引文件。
内存缓冲写记录数据并Spill到磁盘文件
查看SortShuffleWriter类的write()方法可以看到,在内存中缓存记录数据的数据结构有两种:一种是Buffer,对应的实现类PartitionedPairBuffer,设置mapSideCombine=false时会使用该结构;另一种是Map,对应的实现类是PartitionedAppendOnlyMap,设置mapSideCombine=false时会使用该结构。根据是否指定mapSideCombine选项,分别对应不同的处理流程,我们分别说明如下:
- 设置mapSideCombine=false时
这种情况在Map阶段不进行Combine操作,在内存中缓存记录数据会使用PartitionedPairBuffer这种数据结构来缓存、排序记录数据,它是一个Append-only Buffer,仅支持向Buffer中追加数据键值对记录,PartitionedPairBuffer的结构如下图所示:
默认情况下,PartitionedPairBuffer初始分配的存储容量为capacity = initialCapacity = 64,实际上这个容量是针对key的容量,因为要存储的是键值对记录数据,所以实际存储键值对的容量为2*initialCapacity = 128。PartitionedPairBuffer是一个能够动态扩充容量的Buffer,内部使用一个一维数组来存储键值对,每次扩容结果为当前Buffer容量的2倍,即2*capacity,最大支持存储2^31-1个键值对记录(1073741823个)。
通过上图可以看到,PartitionedPairBuffer存储的键值对记录数据,键是(partition, key)这样一个Tuple,值是对应的数据value,而且curSize是用来跟踪写入Buffer中的记录的,key在Buffer中的索引位置为2*curSize,value的索引位置为2*curSize+1,可见一个键值对的key和value的存储在PartitionedPairBuffer内部的数组中是相邻的。
使用PartitionedPairBuffer缓存键值对记录数据,通过跟踪实际写入到Buffer内的记录数据的字节数来判断,是否需要将Buffer中的数据Spill到磁盘文件,如下代码所示:
01 protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { 02 var shouldSpill = false 03 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 04 // Claim up to double our current memory from the shuffle memory pool 05 val amountToRequest = 2 * currentMemory - myMemoryThreshold 06 val granted = acquireMemory(amountToRequest) 07 myMemoryThreshold += granted 08 // If we were granted too little memory to grow further (either tryToAcquire returned 0, 09 // or we already had more memory than myMemoryThreshold), spill the current collection 10 shouldSpill = currentMemory >= myMemoryThreshold 11 } 12 shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold 13 // Actually spill 14 if (shouldSpill) { 15 _spillCount += 1 16 logSpillage(currentMemory) 17 spill(collection) 18 _elementsRead = 0 19 _memoryBytesSpilled += currentMemory 20 releaseMemory() 21 } 22 shouldSpill 23 }
上面elementsRead表示存储到PartitionedPairBuffer中的记录数,currentMemory是对Buffer中的总记录数据大小(字节数)的估算,myMemoryThreshold通过配置项spark.shuffle.spill.initialMemoryThreshold来进行设置的,默认值为5 * 1024 * 1024 = 5M。当满足条件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold时,会先尝试向MemoryManager申请2 * currentMemory – myMemoryThreshold大小的内存,如果能够申请到,则不进行Spill操作,而是继续向Buffer中存储数据,否则就会调用spill()方法将Buffer中数据输出到磁盘文件。
向PartitionedPairBuffer中写入记录数据,以及满足条件Spill记录数据到磁盘文件,具体处理流程,如下图所示:
为了查看按照怎样的规则进行排序,我们看一下,当不进行Map Side Combine时,创建ExternalSorter对象的代码如下所示:
1 // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 2 // care whether the keys get sorted in each partition; that will be done on the reduce side 3 // if the operation being run is sortByKey. 4 new ExternalSorter[K, V, V]( 5 context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
上面aggregator = None,ordering = None,在对PartitionedPairBuffer中的记录数据Spill到磁盘之前,要使用默认的排序规则进行排序,排序的规则是只对PartitionedPairBuffer中的记录按Partition ID进行升序排序,可以查看WritablePartitionedPairCollection伴生对象类的代码(其中PartitionedPairBuffer类实现了特质WritablePartitionedPairCollection),如下所示:
1 /** 2 * A comparator for (Int, K) pairs that orders them by only their partition ID. 3 */ 4 def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] { 5 override def compare(a: (Int, K), b: (Int, K)): Int = { 6 a._1 - b._1 7 } 8 }
上面图中,引用了SortShuffleWriter.writeBlockFiles这个子序列图,用来生成Block数据文件和索引文件,后面我们会单独说明。通过对RDD进行计算生成一个记录迭代器对象,通过该迭代器迭代出的记录会存储到PartitionedPairBuffer中,当满足Spill条件时,先对PartitionedPairBuffer中记录进行排序,最后Spill到磁盘文件,这个过程中PartitionedPairBuffer中的记录数据的变化情况,如下图所示:
上图中,对内存中PartitionedPairBuffer中的记录按照Partition ID进行排序,并且属于同一个Partition的数据记录在PartitionedPairBuffer内部的data数组中是连续的。排序结束后,在Spill到磁盘文件时,将对应的Partition ID去掉了,只在文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中连续存储键值对数据,但同时在另一个内存数组结构中会保存文件中每个Partition拥有的记录数,这样就能根据Partition的记录数来顺序读取文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中属于同一个Partition的全部记录数据。
ExternalSorter类内部维护了一个SpillFile的ArrayBuffer数组,最终可能会生成多个SpillFile,SpillFile的定义如下所示:
1 private[this] case class SpilledFile( 2 file: File, 3 blockId: BlockId, 4 serializerBatchSizes: Array[Long], 5 elementsPerPartition: Array[Long])
每个SpillFile包含一个blockId,标识Map输出的该临时文件;serializerBatchSizes表示每次批量写入到文件的Object的数量,默认为10000,由配置项spark.shuffle.spill.batchSize来控制;elementsPerPartition表示每个Partition中的Object的数量。调用ExternalSorter的insertAll()方法,最终可能有如下3种情况:
- Map阶段输出记录数较少,没有生成SpillFile,那么所有数据都在Buffer中,直接对Buffer中记录排序并输出到文件
- Map阶段输出记录数较多,生成多个SpillFile,同时Buffer中也有部分记录数据
- Map阶段输出记录数较多,只生成多个SpillFile
有关后续如何对上面3种情况进行处理,可以想见后面对子序列图SortShuffleWriter.writeBlockFiles的说明。
- 设置mapSideCombine=true时
这种情况在Map阶段会执行Combine操作,在Map阶段进行Combine操作能够降低Map阶段数据记录的总数,从而降低Shuffle过程中数据的跨网络拷贝传输。这时,RDD对应的ShuffleDependency需要设置一个Aggregator用来执行Combine操作,可以看下Aggregator类声明,代码如下所示:
01 /** 02 * :: DeveloperApi :: 03 * A set of functions used to aggregate data. 04 * 05 * @param createCombiner function to create the initial value of the aggregation. 06 * @param mergeValue function to merge a new value into the aggregation result. 07 * @param mergeCombiners function to merge outputs from multiple mergeValue function. 08 */ 09 @DeveloperApi 10 case class Aggregator[K, V, C] ( 11 createCombiner: V => C, 12 mergeValue: (C, V) => C, 13 mergeCombiners: (C, C) => C) { 14 ... ... 15 }
由于在Map阶段只用到了构造Aggregator的几个函数参数createCombiner、mergeValue、mergeCombiners,我们对这几个函数详细说明如下:
- createCombiner:进行Aggregation开始时,需要设置初始值。因为在Aggregation过程中使用了类似Map的内存数据结构来管理键值对,每次加入前会先查看Map内存结构中是否存在Key对应的Value,第一次肯定不存在,所以首次将某个Key的Value加入到Map内存结构中时,Key在Map内存结构中第一次有了Value。
- mergeValue:某个Key已经在Map结构中存在Value,后续某次又遇到相同的Key和一个新的Value,这时需要通过该函数,将旧Value和新Value进行合并,根据Key检索能够得到合并后的新Value。
- mergeCombiners:一个Map内存结构中Key和Value是由mergeValue生成的,那么在向Map中插入数据,肯定会遇到Map使用容量达到上限,这时需要将记录数据Spill到磁盘文件,那么多个Spill输出的磁盘文件中可能存在同一个Key,这时需要对多个Spill输出的磁盘文件中的Key的多个Value进行合并,这时需要使用mergeCombiners函数进行处理。
该类中定义了combineValuesByKey、combineValuesByKey、combineCombinersByKey,由于这些函数是在Reduce阶段使用的,所以在这里先不说明,后续文章我们会单独详细来分析。
我们通过下面的序列图来描述,需要进行Map Side Combine时的处理流程,如下所示:
对照上图,我们看一下,当需要进行Map Side Combine时,对应的ExternalSorter类insertAll()方法中的处理逻辑,代码如下所示:
01 val shouldCombine = aggregator.isDefined 02 03 if (shouldCombine) { 04 // Combine values in-memory first using our AppendOnlyMap 05 val mergeValue = aggregator.get.mergeValue 06 val createCombiner = aggregator.get.createCombiner 07 var kv: Product2[K, V] = null 08 val update = (hadValue: Boolean, oldValue: C) => { 09 if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) 10 } 11 while (records.hasNext) { 12 addElementsRead() 13 kv = records.next() 14 map.changeValue((getPartition(kv._1), kv._1), update) 15 maybeSpillCollection(usingMap = true) 16 } 17 }
上面代码中,map是内存数据结构,最重要的是update函数和map的changeValue方法(这里的map对应的实现类是PartitionedAppendOnlyMap)。update函数所做的工作,其实就是对createCombiner和mergeValue这两个函数的使用,第一次遇到一个Key调用createCombiner函数处理,非首次遇到同一个Key对应新的Value调用mergeValue函数进行合并处理。map的changeValue方法主要是将Key和Value在map中存储或者进行修改(对出现的同一个Key的多个Value进行合并,并将合并后的新Value替换旧Value)。
PartitionedAppendOnlyMap是一个经过优化的哈希表,它支持向map中追加数据,以及修改Key对应的Value,但是不支持删除某个Key及其对应的Value。它能够支持的存储容量是0.7 * 2 ^ 29 = 375809638。当达到指定存储容量或者指定限制,就会将map中记录数据Spill到磁盘文件,这个过程和前面的类似,不再累述。
创建Shuffle Block数据文件及其索引文件
无论是使用PartitionedPairBuffer,还是使用PartitionedAppendOnlyMap,当需要容量满足Spill条件时,都会将该内存结构(buffer/map)中记录数据Spill到磁盘文件,所以Spill到磁盘文件的格式是相同的。对于后续Block数据文件和索引文件的生成逻辑也是相同,如下图所示:
假设,我们生成的Shuffle Block文件对应各个参数为:shuffleId=2901,mapId=11825,reduceId=0,这里reduceId是一个NOOP_REDUCE_ID,表示与DiskStore进行磁盘I/O交互操作,而DiskStore期望对应一个(map, reduce)对,但是对于排序的Shuffle输出,通常Reducer拉取数据后只生成一个文件(Reduce文件),所以这里默认reduceId为0。经过上图的处理流程,可以生成一个.data文件,也就是Block数据文件;一个.index文件,也就是包含了各个Partition在数据文件中的偏移位置的索引文件。这个过程生成的文件,示例如下所示:
1 shuffle_2901_11825_0.data 2 shuffle_2901_11825_0.index
这样,对于每个RDD的多个Partition进行处理后,都会生成对应的数据文件和索引文件,后续在Reduce端就可以读取这些Block文件,这些记录数据在文件中都是经过分区(Partitioned)的。
End.
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- MyBatis执行流程的各阶段介绍
- Spark Shuffle过程分析:Map阶段处理流程
- 金色观察丨以太坊2.0从“零阶段”到“一阶段”比想象中简单?
- 技术人员发展四阶段
- 软件需求阶段—质量全面管控
- Nginx执行阶段详细解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Data Structures and Algorithm Analysis in Java
Mark A. Weiss / Pearson / 2011-11-18 / GBP 129.99
Data Structures and Algorithm Analysis in Java is an “advanced algorithms” book that fits between traditional CS2 and Algorithms Analysis courses. In the old ACM Curriculum Guidelines, this course wa......一起来看看 《Data Structures and Algorithm Analysis in Java》 这本书的介绍吧!