内容简介:对 Spark 中 DAGScheduler 阶段划分的一次探索
首发个人公众号 spark技术分享 , 同步个人网站 coolplayer.net ,未经本人同意,禁止一切转载
几个概念
-
Narrow Dependency指的是 child RDD只依赖于parent RDD(s)固定数量的partition。
-
Wide Dependency指的是child RDD的每一个partition都依赖于parent RDD(s)所有partition。
下文中 窄依赖 就是指 Narrow Dependency, 宽依赖就是指 Wide Dependency, 宽依赖也称为 shuffle Dependency, 下文图中标识 S 打头,例如 S_X。
根据宽依赖和窄依赖, 整个job,会划分为不同的stage, 像是用篱笆隔开了一样, 如果中间有宽依赖,就用刀切一刀, 前后划分为两个 stage。
stage 分为两种, ResultStage 和 ShuffleMapStage, spark job 中产生结果最后一个阶段生成的stage 是ResultStage , 中间阶段生成的stage 是 ShuffleMapStage
属于 ResultStage 的Task都是 ResultTask , 属于 ShuffleMapStage 的Task都是 ShuffleMapTask
依赖链生成过程
spark 中 DAGScheduler 进行阶段划分主要使用以下几个函数, 在调用过程中使用了递归的思想。函数调用关系如下:
creatResultStage getMissingParentStages | | | | v | getOrCreateParentStages <-------------|--------| | 获取所有的shuffle依赖 | | v | | getOrCreateShuffleMapStage<-----------| | | 当前以及所有存在的上游stage都要创建 | | | v | createShuffleMapStage--------------------------|尝试创建上游stage,然后创建自己
我们先看几个函数实现
-
creatResultStage 创建一个 ResultStage, 整个调用的起始点, 会在 finalRDD 上调用 creatResultStage, 加入我们的依赖链条是
A <------------s---------, \ B <--s-- C <--s-- D <--n---`-- E
这样的,就会在 E 这个 RDD 上调用 creatResultStage, 会先创建所有父亲stage, 调用 getOrCreateParentStages, 返回 List[stage]作为父 stage后创建自己 的 stage 作为 ResultStage。
-
getOrCreateParentStages
这个函数调用 getShuffleDependencies 获取到所有的直接 Shuffle Dependecy, 然后遍历去调用getOrCreateShuffleMapStage 创建直接父亲 stage。
举例: getOrCreateParentStages(E), 会获取 E的所有直接 宽依赖 List(S_A, S_C), 然后 对 S_A 和 S_C 调用 getOrCreateShuffleMapStage。
-
getShuffleDependencies
获取所有的直接 getShuffleDependencies , getShuffleDependencies(rdd), 一直追溯rdd的依赖直到依赖类型为ShuffleDenpendency,如果碰到 NarrowDependency 就继续往前追溯,这个方法实现了广度遍历的过程。而且它只返回rdd的直属父shuffle依赖,祖先shuffle依赖不返回。
举个例子
A <------------s---------, \ B <--s-- C <--s-- D <--n---`-- E
如果是 getShuffleDependencies(E), 则 返回的是 List(S_A, S_C), 追溯 rdd_E 的依赖, 碰到 S_A 为 ShuffleDenpendency, 加入集合, 碰到 N_D 为 NarrowDependency, 则继续追溯 D 的依赖, 碰到 S_C 为 ShuffleDenpendency, 加入集合, 最终返回 List(S_A, S_C)。
过程如下:
-
getOrCreateShuffleMapStage
创建好的 stage 都会 把 (shuffleId, stage)映射放入 shuffleIdToMapStage, getOrCreateShuffleMapStage 函数中判断如果映射已经在 shuffleIdToMapStage 中了, 直接取出对应的 stage 返回, 如果没有, 则会调用 getMissingAncestorShuffleDependencies, 获取当前 ShuffleDenpendency 的rdd的所有祖先 ShuffleDenpendency, 遍历调用 createShuffleMapStage 创建 上游的 stage, 然后调用 createShuffleMapStage 创建自己的 stage
举例:
A <------------s---------, \ B <--s-- C <--s-- D <--n---`-- E
getOrCreateShuffleMapStage(S_C), 因为 stage 第一次创建,shuffleIdToMapStage 中不存在映射, 所以需要对 rdd_C 调用 getMissingAncestorShuffleDependencies, 获取所有祖先宽依赖 list(S_B), 然后遍历 list(S_B), 对 S_B 调用 createShuffleMapStage, 返回后, 再对自己 S_C 调用 createShuffleMapStage。
-
getMissingAncestorShuffleDependencies
获取一个 rdd的所有祖先 ShuffleDenpendency, 返回一个 祖先 ShuffleDenpendency 的栈, 注意这里栈数据排放的顺序保证了要保证, 直接父亲宽依赖在栈中总是放在后代宽依赖的上面, 直接父亲宽依赖会被先取出创建 stage, 然后映射放入 shuffleIdToMapStage中, 保证了创建后代stage的时候, 后代 stage 总是可以直接从 shuffleIdToMapStage中拿到直接父亲的stage。
例子:
A <--s---C<---s---E<----s------, \ B <--s-- D <--s-- F <--s---H--n--`--I
getMissingAncestorShuffleDependencies(I) 最终会返回 stack(S_A, S_C, S_B, S_D, S_E, S_F, S_I), 注意栈的顺序, S_A位于栈顶。
过程如下:
-
getMissingParentStages, 获取所有的直接父亲 stage
整个流程
假如我们有一个 spark job 依赖关系如下,
我们对上图抽象一下, 本质如下:
E <-------n------, \ C <--n---D---n-----F--s---, \ A <-------s------ B <--n----`-- G
还记得函数调用关系么, 我这里再贴一下, 方便对照
creatResultStage getMissingParentStages | | | | v | getOrCreateParentStages <-------------|--------| | 获取所有的shuffle依赖 | | v | | getOrCreateShuffleMapStage<-----------| | | 当前以及所有存在的上游stage都要创建 | | | v | createShuffleMapStage--------------------------|尝试创建上游stage,然后创建自己
我们对着调用关系串起来整个流程
最终结果
-
对 G 调用 creatResultStage,通过getOrCreateParentStages 获取所有的 parents:List[stage], 作为所有直接父亲stage,创建本身的 ResultStage。 getOrCreateParentStages会先创建上游 stage1 stage2, 然后创建自己的 stage3
-
getOrCreateParentStages 会调用 getShuffleDependencies 获得 rdd_G 所有直接宽依赖 HashSet(S_F, S_A), 然后遍历集合,对 S_F 和 S_A 调用 getOrCreateShuffleMapStage,
-
对 S_A 调用 getOrCreateShuffleMapStage, shuffleIdToMapStage 中获取判断为None, 对 rdd_A 调用 getMissingAncestorShuffleDependencies, 返回为空, 对 S_A 调用 createShuffleMapStage, 由于rdd_A 没有parent stage 直接就创建 stage1 返回。
-
对 S_F 调用 getOrCreateShuffleMapStage, shuffleIdToMapStage 中获取判断为None, 对 rdd_F 调用 getMissingAncestorShuffleDependencies, 返回为空, 对 S_F 调用 createShuffleMapStage, 由于rdd_F 没有parent stage 直接就创建 stage2 返回。
-
把 List(stage1,stage2) 作为 stage3 的 parents stages 创建 stage3
stage | parents | |
---|---|---|
S_A | stage1 | List() |
S_F | stage2 | List() |
- | ResultStage3 | List(stage1, stage2) |
ios 用户赞赏通道:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 如何划分限界上下文
- 如何给 Hadoop 集群划分角色
- 整数划分--思考问题背后的数学原理
- 【LeetCode】贪心算法--划分字母区间(763)
- JVM笔记-运行时内存区域划分
- Python列表推导式一则:等价类划分
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。