内容简介:Spark 大会上,所有的演讲嘉宾都认为 shuffle 是最影响性能的地方,但是又无可奈何。之前去百度面试 hadoop 的时候,也被问到了这个问题,直接回答了不知道。这篇文章主要是沿着下面几个问题来开展:1、shuffle 过程的划分?
Spark 大会上,所有的演讲嘉宾都认为 shuffle 是最影响性能的地方,但是又无可奈何。之前去百度面试 hadoop 的时候,也被问到了这个问题,直接回答了不知道。
这篇文章主要是沿着下面几个问题来开展:
1、shuffle 过程的划分?
2、shuffle 的中间结果如何存储?
3、shuffle 的数据如何拉取过来?
Shuffle 过程的划分
Spark 的操作模型是基于 RDD 的,当调用 RDD 的 reduceByKey、groupByKey 等类似的操作的时候,就需要有 shuffle 了。再拿出 reduceByKey 这个来讲。
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = { reduceByKey(new HashPartitioner(numPartitions), func) } 复制代码
reduceByKey 的时候,我们可以手动设定 reduce 的个数,如果不指定的话,就可能不受控制了。
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } } 复制代码
如果不指定 reduce 个数的话,就按默认的走:
1、如果自定义了分区函数 partitioner 的话,就按你的分区函数来走。
2、如果没有定义,那么如果设置了 spark.default.parallelism,就使用哈希的分区方式,reduce 个数就是设置的这个值。
3、如果这个也没设置,那就按照输入数据的分片的数量来设定。如果是 hadoop 的输入数据的话,这个就多了,大家可要小心啊。
设定完之后,它会做三件事情,也就是之前讲的 3 次 RDD 转换。
//map端先按照key合并一次 val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) //reduce抓取数据 val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer) //合并数据,执行reduce计算 partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) 复制代码
View Code
1、在第一个 MapPartitionsRDD 这里先做一次 map 端的聚合操作。
2、ShuffledRDD 主要是做从这个抓取数据的工作。
3、第二个 MapPartitionsRDD 把抓取过来的数据再次进行聚合操作。
4、步骤 1 和步骤 3 都会涉及到 spill 的过程。
怎么做的聚合操作,回去看 RDD 那章。
Shuffle 的中间结果如何存储
作业提交的时候,DAGScheduler 会把 Shuffle 的过程切分成 map 和 reduce 两个 Stage(之前一直被我叫做 shuffle 前和 shuffle 后),具体的切分的位置在上图的虚线处。
map 端的任务会作为一个 ShuffleMapTask 提交,最后在 TaskRunner 里面调用了它的 runTask 方法。
override def runTask(context: TaskContext): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions metrics = Some(context.taskMetrics) val blockManager = SparkEnv.get.blockManager val shuffleBlockManager = blockManager.shuffleBlockManager var shuffle: ShuffleWriterGroup = null var success = false try { // serializer为空的情况调用默认的JavaSerializer,也可以通过spark.serializer来设置成别的 val ser = Serializer.getSerializer(dep.serializer) // 实例化Writer,Writer的数量=numOutputSplits=前面我们说的那个reduce的数量 shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser) // 遍历rdd的元素,按照key计算出来它所在的bucketId,然后通过bucketId找到相应的Writer写入 for (elem <- rdd.iterator(split, context)) { val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) shuffle.writers(bucketId).write(pair) } // 提交写入操作. 计算每个bucket block的大小 var totalBytes = 0L var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) } // 更新 shuffle 监控参数. val shuffleMetrics = new ShuffleWriteMetrics shuffleMetrics.shuffleBytesWritten = totalBytes shuffleMetrics.shuffleWriteTime = totalTime metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) success = true new MapStatus(blockManager.blockManagerId, compressedSizes) } catch { case e: Exception => // 出错了,取消之前的操作,关闭writer if (shuffle != null && shuffle.writers != null) { for (writer <- shuffle.writers) { writer.revertPartialWrites() writer.close() } } throw e } finally { // 关闭writer if (shuffle != null && shuffle.writers != null) { try { shuffle.releaseWriters(success) } catch { case e: Exception => logError("Failed to release shuffle writers", e) } } // 执行注册的回调函数,一般是做清理工作 context.executeOnCompleteCallbacks() } } 复制代码
遍历每一个记录,通过它的 key 来确定它的 bucketId,再通过这个 bucket 的 writer 写入数据。
下面我们看看 ShuffleBlockManager 的 forMapTask 方法吧。
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup { shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup() Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) // 从已有的文件组里选文件,一个bucket一个文件,即要发送到同一个reduce的数据写入到同一个文件 blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize) } } else { Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => // 按照blockId来生成文件,文件数为map数*reduce数 val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) if (blockFile.exists) { if (blockFile.delete()) { logInfo(s"Removed existing shuffle file $blockFile") } else { logWarning(s"Failed to remove existing shuffle file $blockFile") } } blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } } 复制代码
1、map 的中间结果是写入到本地硬盘的,而不是内存。
2、默认是一个 Executor 的中间结果文件是 M*R(M=map 数量,R=reduce 的数量),设置了 spark.shuffle.consolidateFiles 为 true 之后是 R 个文件,根据 bucketId 把要分到同一个 reduce 的结果写入到一个文件中。
3、consolidateFiles 采用的是一个 reduce 一个文件,它还记录了每个 map 的写入起始位置,所以查找的时候先通过 reduceId 查找到哪个文件,再通过 mapId 查找索引当中的起始位置 offset,长度 length=(mapId + 1).offset -(mapId).offset,这样就可以确定一个 FileSegment(file, offset, length)。
4、Finally,存储结束之后, 返回了一个 new MapStatus(blockManager.blockManagerId, compressedSizes),把 blockManagerId 和 block 的大小都一起返回。
个人想法,shuffle 这块和 hadoop 的机制差别不大,tez 这样的引擎会赶上 spark 的速度呢?还是让我们拭目以待吧!
Shuffle 的数据如何拉取过来
ShuffleMapTask 结束之后,最后走到 DAGScheduler 的 handleTaskCompletion 方法当中(关于中间的过程,请看《图解作业生命周期》)。
case smt: ShuffleMapTask => val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) } else { stage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) { markStageAsFinished(stage) if (stage.shuffleDep.isDefined) { // 真的map过程才会有这个依赖,reduce过程None mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, changeEpoch = true) } clearCacheLocs() if (stage.outputLocs.exists(_ == Nil)) { // 一些任务失败了,需要重新提交stage submitStage(stage) } else { // 提交下一批任务 } } 复制代码
1、把结果添加到 Stage 的 outputLocs 数组里,它是按照数据的分区 Id 来存储映射关系的 partitionId->MapStaus。
2、stage 结束之后,通过 mapOutputTracker 的 registerMapOutputs 方法,把此次 shuffle 的结果 outputLocs 记录到 mapOutputTracker 里面。
这个 stage 结束之后,就到 ShuffleRDD 运行了,我们看一下它的 compute 函数。
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser) 复制代码
它是通过 ShuffleFetch 的 fetch 方法来抓取的,具体实现在 BlockStoreShuffleFetcher 里面。
override def fetch[T]( shuffleId: Int, reduceId: Int, context: TaskContext, serializer: Serializer) : Iterator[T] = { val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis // mapOutputTracker也分Master和Worker,Worker向Master请求获取reduce相关的MapStatus,主要是(BlockManagerId和size) val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) // 一个BlockManagerId对应多个文件的大小 val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] for (((address, size), index) <- statuses.zipWithIndex) { splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) } // 构造BlockManagerId 和 BlockId的映射关系,想不到ShffleBlockId的mapId,居然是1,2,3,4的序列... val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map { case (address, splits) => (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2))) } // 名为updateBlock,实际是检验函数,每个Block都对应着一个Iterator接口,如果该接口为空,则应该报错 def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = { val blockId = blockPair._1 val blockOption = blockPair._2 blockOption match { case Some(block) => { block.asInstanceOf[Iterator[T]] } case None => { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null) case _ => throw new SparkException("Failed to get block " + blockId + ", which is not a shuffle block") } } } } // 从blockManager获取reduce所需要的全部block,并添加校验函数 val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock) val completionIter = CompletionIterator[T, Iterator[T]](itr, { // CompelteIterator迭代结束之后,会执行以下这部分代码,提交它记录的各种参数 val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleFinishTime = System.currentTimeMillis shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics) }) new InterruptibleIterator[T](context, completionIter) } } 复制代码
1、MapOutputTrackerWorker 向 MapOutputTrackerMaster 获取 shuffle 相关的 map 结果信息。
2、把 map 结果信息构造成 BlockManagerId --> Array(BlockId, size) 的映射关系。
3、通过 BlockManager 的 getMultiple 批量拉取 block。
4、返回一个可遍历的 Iterator 接口,并更新相关的监控参数。
我们继续看 getMultiple 方法。
def getMultiple( blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], serializer: Serializer): BlockFetcherIterator = { val iter = if (conf.getBoolean("spark.shuffle.use.netty", false)) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) } iter.initialize() iter } 复制代码
分两种情况处理,分别是 netty 的和 Basic 的,Basic 的就不讲了,就是通过 ConnectionManager 去指定的 BlockManager 那里获取数据,上一章刚好说了。
我们讲一下 Netty 的吧,这个是需要设置的才能启用的,不知道性能会不会好一些呢?
看 NettyBlockFetcherIterator 的 initialize 方法,再看 BasicBlockFetcherIterator 的 initialize 方法,发现 Basic 的不能同时抓取超过 48Mb 的数据。
override def initialize() { // 分开本地请求和远程请求,返回远程的FetchRequest val remoteRequests = splitLocalRemoteBlocks() // 抓取顺序随机 for (request <- Utils.randomize(remoteRequests)) { fetchRequestsSync.put(request) } // 默认是开6个线程去进行抓取 copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// 读取本地的block getLocalBlocks() } 复制代码
在 NettyBlockFetcherIterator 的 sendRequest 方法里面,发现它是通过 ShuffleCopier 来试下的。
val cpier = new ShuffleCopier(blockManager.conf) cpier.getBlocks(cmId, req.blocks, putResult) 复制代码
这块接下来就是 netty 的客户端调用的方法了,我对这个不了解。在服务端的处理是在 DiskBlockManager 内部启动了一个 ShuffleSender 的服务,最终的业务处理逻辑是在 FileServerHandler。
它是通过 getBlockLocation 返回一个 FileSegment,下面这段代码是 ShuffleBlockManager 的 getBlockLocation 方法。
def getBlockLocation(id: ShuffleBlockId): FileSegment = { // Search all file groups associated with this shuffle. val shuffleState = shuffleStates(id.shuffleId) for (fileGroup <- shuffleState.allFileGroups) { val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) if (segment.isDefined) { return segment.get } } throw new IllegalStateException("Failed to find shuffle block: " + id) } 复制代码
先通过 shuffleId 找到 ShuffleState,再通过 reduceId 找到文件,最后通过 mapId 确定它的文件分片的位置。但是这里有个疑问了,如果启用了 consolidateFiles,一个 reduce 的所需数据都在一个文件里,是不是就可以把整个文件一起返回呢,而不是通过 N 个 map 来多次读取?还是害怕一次发送一个大文件容易失败?这就不得而知了。
到这里整个过程就讲完了。可以看得出来 Shuffle 这块还是做了一些优化的,但是这些参数并没有启用,有需要的朋友可以自己启用一下试试效果。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
指数型组织
萨利姆•伊斯梅尔 (Salim Ismail)、迈克尔•马隆 (Michael S. Malone)、尤里•范吉斯特 (Yuri van Geest) / 苏健 / 浙江人民出版社 / 2015-8-1 / CNY 69.90
《指数型组织》是一本指数级时代企业行动手册。作者奇点大学创始执行理事萨利姆·伊斯梅尔归纳了指数型组织的11个强大属性,并提出了建立指数型组织的12个关键步骤。通过自己创建的一套“指数商”测试题,伊斯梅尔还测量出了指数型组织世界100强。 为什么小米、海尔和阿里巴巴能进入“指数型组织世界100强”名单?“独角兽”Uber、Airbnb、谷歌等知名企业是如何指数化自己的组织的? 未......一起来看看 《指数型组织》 这本书的介绍吧!