对 Spark 中 DAGScheduler 阶段划分的一次探索

栏目: 服务器 · 发布时间: 6年前

内容简介:对 Spark 中 DAGScheduler 阶段划分的一次探索

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载

几个概念

对 Spark 中 DAGScheduler 阶段划分的一次探索

  • Narrow Dependency指的是 child RDD只依赖于parent RDD(s)固定数量的partition。

  • Wide Dependency指的是child RDD的每一个partition都依赖于parent RDD(s)所有partition。

下文中 窄依赖 就是指 Narrow Dependency, 宽依赖就是指 Wide Dependency,  宽依赖也称为 shuffle Dependency, 下文图中标识 S 打头,例如 S_X。

根据宽依赖和窄依赖, 整个job,会划分为不同的stage, 像是用篱笆隔开了一样, 如果中间有宽依赖,就用刀切一刀, 前后划分为两个 stage。

stage 分为两种, ResultStage 和 ShuffleMapStage, spark job 中产生结果最后一个阶段生成的stage 是ResultStage , 中间阶段生成的stage 是 ShuffleMapStage

属于 ResultStage 的Task都是 ResultTask , 属于 ShuffleMapStage 的Task都是 ShuffleMapTask

依赖链生成过程

spark 中 DAGScheduler 进行阶段划分主要使用以下几个函数,  在调用过程中使用了递归的思想。函数调用关系如下:

creatResultStage               getMissingParentStages
           |                          |
           |                          |
           v                          | 
getOrCreateParentStages <-------------|--------|
          |  获取所有的shuffle依赖      |        |
          v                           |        |
getOrCreateShuffleMapStage<-----------|        |
         |  当前以及所有存在的上游stage都要创建     |                                         
         |                                     |
         v                                     | 
createShuffleMapStage--------------------------|尝试创建上游stage,然后创建自己

我们先看几个函数实现

  • creatResultStage  创建一个 ResultStage,  整个调用的起始点, 会在 finalRDD 上调用 creatResultStage,  加入我们的依赖链条是

A <------------s---------,
                          \
B <--s-- C <--s-- D <--n---`-- E

这样的,就会在 E 这个 RDD 上调用 creatResultStage, 会先创建所有父亲stage, 调用 getOrCreateParentStages, 返回 List[stage]作为父 stage后创建自己 的 stage 作为 ResultStage。

  • getOrCreateParentStages

这个函数调用 getShuffleDependencies 获取到所有的直接 Shuffle Dependecy,  然后遍历去调用getOrCreateShuffleMapStage 创建直接父亲 stage。

举例: getOrCreateParentStages(E), 会获取 E的所有直接 宽依赖 List(S_A, S_C), 然后 对 S_A 和 S_C 调用 getOrCreateShuffleMapStage。

  • getShuffleDependencies

获取所有的直接 getShuffleDependencies ,  getShuffleDependencies(rdd), 一直追溯rdd的依赖直到依赖类型为ShuffleDenpendency,如果碰到 NarrowDependency 就继续往前追溯,这个方法实现了广度遍历的过程。而且它只返回rdd的直属父shuffle依赖,祖先shuffle依赖不返回。

举个例子

A <------------s---------,
                          \
B <--s-- C <--s-- D <--n---`-- E

如果是  getShuffleDependencies(E), 则 返回的是 List(S_A, S_C),  追溯 rdd_E 的依赖, 碰到 S_A 为 ShuffleDenpendency, 加入集合, 碰到 N_D 为 NarrowDependency, 则继续追溯 D 的依赖, 碰到 S_C 为 ShuffleDenpendency, 加入集合, 最终返回 List(S_A, S_C)。

过程如下:

对 Spark 中 DAGScheduler 阶段划分的一次探索

  • getOrCreateShuffleMapStage

创建好的 stage 都会 把 (shuffleId, stage)映射放入 shuffleIdToMapStage, getOrCreateShuffleMapStage 函数中判断如果映射已经在  shuffleIdToMapStage 中了, 直接取出对应的 stage 返回, 如果没有, 则会调用  getMissingAncestorShuffleDependencies, 获取当前 ShuffleDenpendency 的rdd的所有祖先 ShuffleDenpendency, 遍历调用 createShuffleMapStage 创建 上游的 stage,  然后调用 createShuffleMapStage 创建自己的 stage

举例:

A <------------s---------,
                          \
B <--s-- C <--s-- D <--n---`-- E

getOrCreateShuffleMapStage(S_C),  因为 stage 第一次创建,shuffleIdToMapStage 中不存在映射, 所以需要对 rdd_C 调用 getMissingAncestorShuffleDependencies, 获取所有祖先宽依赖 list(S_B),  然后遍历 list(S_B), 对 S_B 调用 createShuffleMapStage, 返回后, 再对自己 S_C 调用 createShuffleMapStage。

  • getMissingAncestorShuffleDependencies

获取一个 rdd的所有祖先 ShuffleDenpendency,  返回一个 祖先 ShuffleDenpendency 的栈, 注意这里栈数据排放的顺序保证了要保证,  直接父亲宽依赖在栈中总是放在后代宽依赖的上面, 直接父亲宽依赖会被先取出创建 stage, 然后映射放入 shuffleIdToMapStage中, 保证了创建后代stage的时候, 后代 stage 总是可以直接从 shuffleIdToMapStage中拿到直接父亲的stage。

例子:

   A <--s---C<---s---E<----s------,
                                    \ 
    B <--s-- D <--s-- F <--s---H--n--`--I

getMissingAncestorShuffleDependencies(I) 最终会返回  stack(S_A, S_C, S_B, S_D, S_E, S_F, S_I), 注意栈的顺序, S_A位于栈顶。

对 Spark 中 DAGScheduler 阶段划分的一次探索

过程如下:

  • getMissingParentStages,  获取所有的直接父亲 stage

整个流程

假如我们有一个 spark job 依赖关系如下,

对 Spark 中 DAGScheduler 阶段划分的一次探索

我们对上图抽象一下, 本质如下:

E <-------n------,
                  \
C <--n---D---n-----F--s---,
                           \
A <-------s------ B <--n----`-- G

还记得函数调用关系么, 我这里再贴一下, 方便对照

creatResultStage               getMissingParentStages
           |                          |
           |                          |
           v                          | 
getOrCreateParentStages <-------------|--------|
          |  获取所有的shuffle依赖      |        |
          v                           |        |
getOrCreateShuffleMapStage<-----------|        |
         |  当前以及所有存在的上游stage都要创建     |                                         
         |                                     |
         v                                     | 
createShuffleMapStage--------------------------|尝试创建上游stage,然后创建自己

我们对着调用关系串起来整个流程

最终结果

  • 对 G 调用 creatResultStage,通过getOrCreateParentStages 获取所有的 parents:List[stage],  作为所有直接父亲stage,创建本身的 ResultStage。 getOrCreateParentStages会先创建上游 stage1 stage2,  然后创建自己的 stage3

  • getOrCreateParentStages 会调用 getShuffleDependencies 获得 rdd_G 所有直接宽依赖 HashSet(S_F, S_A), 然后遍历集合,对  S_F 和 S_A 调用 getOrCreateShuffleMapStage,

  • 对 S_A 调用 getOrCreateShuffleMapStage, shuffleIdToMapStage 中获取判断为None, 对 rdd_A 调用 getMissingAncestorShuffleDependencies, 返回为空, 对 S_A 调用 createShuffleMapStage,  由于rdd_A 没有parent stage 直接就创建 stage1 返回。

  • 对 S_F 调用 getOrCreateShuffleMapStage, shuffleIdToMapStage 中获取判断为None, 对 rdd_F 调用 getMissingAncestorShuffleDependencies, 返回为空, 对 S_F 调用 createShuffleMapStage,  由于rdd_F 没有parent stage 直接就创建 stage2 返回。

  • 把 List(stage1,stage2) 作为 stage3 的 parents stages 创建 stage3

stage parents
S_A stage1 List()
S_F stage2 List()
- ResultStage3 List(stage1, stage2)

ios 用户赞赏通道:

对 Spark 中 DAGScheduler 阶段划分的一次探索


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

实用程序育儿法

实用程序育儿法

(美)特蕾西·霍格、(美)梅林达·布劳 / 张雪兰 / 北京联合出版社 / 2009-1 / 39.00元

《实用程序育儿法》作者世界闻名的实战型育儿专家特蕾西·霍格(Tracy Hogg)以“宝宝耳语专家(Baby Whisperer)”享誉全球,她深入到数千名宝宝的家里解决宝宝和妈妈面临的日常难题,通过演讲、电台、电视台、信件、电子邮件以及住她的网站上发帖跟她交流、向她请教的妈妈们更是不计其数。由她亲自实景示范拍摄的“和宝宝说悄悄话(Thc Baby Whisperer)”DVD全球发行上千万张。她......一起来看看 《实用程序育儿法》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

随机密码生成器
随机密码生成器

多种字符组合密码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具