内容简介:本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:官方英文介绍如下:
本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。
- Spark商业环境实战-Spark内置框架rpc通讯机制及RpcEnv基础设施
- Spark商业环境实战-Spark事件监听总线流程分析
- Spark商业环境实战-Spark存储体系底层架构剖析
- Spark商业环境实战-Spark底层多个MessageLoop循环线程执行流程分析
- Spark商业环境实战-Spark二级调度系统Stage划分算法和最佳任务调度细节剖析
- Spark商业环境实战-Spark任务延迟调度及调度池Pool架构剖析
- Spark商业环境实战-Task粒度的缓存聚合 排序 结构AppendOnlyMap详细剖析
- Spark商业环境实战-ExternalSorter 外部排序器在Spark Shuffle过程中设计思路剖析
- Spark商业环境实战-ShuffleExternalSorter外部排序器在Spark Shuffle过程中的设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器UnsafeShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析
- [Spark商业环境实战-Spark Shuffle 聚合拉取读数据(Reduce Task)过程深入剖析]
- Spark商业环境实战-StreamingContext启动流程及Dtream 模板源码剖析
- Spark商业环境实战-ReceiverTracker与BlockGenerator数据流接收过程剖析
1 从ShuffeManager讲起
一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:
- 目前只有一个实现 SortShuffleManager。
- SortShuffleManager依赖于ShuffleWriter提供服务,通过ShuffleWriter定义的规范,可以将MapTask的任务中间结果按照约束的规范持久化到磁盘。
- SortShuffleManager总共有三个子类, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。
- SortShuffleManager依赖于ShuffleHandle样例类,主要还是负责向Task传递Shuffle信息。一个是序列化,一个是确定何时绕开合并和排序的Shuffle路径。
官方英文介绍如下:
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the * driver and on each executor, based on the spark.shuffle.manager setting. The driver * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data. * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and * boolean isDriver as parameters. 复制代码
1 华山论剑之BypassMergeSortShuffleWriter
从命名来看,绝对是投机取巧,绕开合并和排序的ShuffleWriter,姑且称之为投机侠吧。
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path * writes incoming records to separate files, one file per reduce partition, then concatenates these * per-partition files to form a single output file, regions of which are served to reducers. * Records are not buffered in memory. It writes output in a format 复制代码
2 华山论剑之成员力量
BypassMergeSortShuffleWriter可是直接开挂的节奏,完全没有什么排序器啊,我来承担一切。我最屌,我承担一切,心声,嘿嘿。
2.1 BypassMergeSortShuffleWriter的孩子:
-
partitionWriters : 看看初始化为数组 ==> private DiskBlockObjectWriter[] partitionWriters,每一个DiskBlockObjectWriter负责处理一个分区的数据。
-
private final int fileBufferSize ==>文件缓冲大小,通过Spark.shuffle.file.buffer属性配置,默认是32KB。
-
private final boolean transferToEnabled => 是否采用NIO的从文件流待文件流的复制方式,spark.file.transferTo属性配置,默认是true。
-
private final int numPartitions => 分区数
-
private final BlockManager blockManager
-
private final Partitioner partitioner => 分区计算器
-
private final ShuffleWriteMetrics writeMetrics
-
private final int shuffleId;
-
private final int mapId ==>map任务的身份标识。
-
private final Serializer serializer;
-
private final IndexShuffleBlockResolver shuffleBlockResolver
-
private FileSegment[] partitionWriterSegments ==>FileSegment数组,每一个DiskBlockObjectWriter对应一个分区,也因此对应一个处理的文件片。
-
@Nullable private MapStatus mapStatus;
-
private long[] partitionLengths;
2 BypassMergeSortShuffleWriter核心实现方法Writer
先欣赏代码段:
public void write(Iterator<Product2<K, V>> records) throws IOException { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); partitionWriters = new DiskBlockObjectWriter[numPartitions]; <=点睛之笔 partitionWriterSegments = new FileSegment[numPartitions]; <=点睛之笔 for (int i = 0; i < numPartitions; i++) { <=点睛之笔(按照分区来写片段) final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = <=点睛之笔(得到不同分区的DiskBlockObjectWriter) blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); while (records.hasNext()) { final Product2<K, V> record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } for (int i = 0; i < numPartitions; i++) { final DiskBlockObjectWriter writer = partitionWriters[i]; partitionWriterSegments[i] = writer.commitAndGet(); <= 生成一堆临时文件,写入到磁盘 writer.close(); } File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); <==获取一堆临时文件 File tmp = Utils.tempFileWith(output); try { partitionLengths = writePartitionedFile(tmp); <==多个分区文件合并 shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); <==生成索引 } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } 复制代码
2 BypassMergeSortShuffleWriter核心实现方法writePartitionedFile
聚合每一个分区文件为正式的Block文件
Concatenate all of the per-partition files into a single combined file. private long[] writePartitionedFile(File outputFile) throws IOException { // Track location of the partition starts in the output file final long[] lengths = new long[numPartitions]; if (partitionWriters == null) { // We were passed an empty iterator return lengths; } final FileOutputStream out = new FileOutputStream(outputFile, true); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } threwException = false; } finally { Closeables.close(out, threwException); writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; return lengths; } 复制代码
3 BypassMergeSortShuffleWriter核心shuffle write流程
- 根据分区ID,为每一个分区创建DiskBlockObjectWriter
- 按照分区ID升序写入正式的Shuffle数据文件
- 最终通过writeIndexFileAndCommit建立MapTask输出的数据索引
不废话,这张图简直画的太好了,望原图作者看到留言于我。
以上所述就是小编给大家介绍的《Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析-Spark商业环境实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Golang中的channel代码示例----无缓冲、有缓冲、range、close
- 缓冲区溢出实战教程系列(一):第一个缓冲区溢出小程序
- 原理讲解有缓冲I/O与无缓冲I/O的区别
- 小议缓冲区溢出
- 缓冲区溢出(栈溢出)
- 一种极致性能的缓冲队列
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。