内容简介:这一章我们探索了 Spark 作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know!我们先回顾一下这个图,Driver Program 是我们写的那个程序,它的核心是 SparkContext,回想一下,从 api 的使用角度,RDD 都必须通过它来获得。下面讲一讲它所不为认知的一面,它和其它组件是如何交互的。
这一章我们探索了 Spark 作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know!
我们先回顾一下这个图,Driver Program 是我们写的那个程序,它的核心是 SparkContext,回想一下,从 api 的使用角度,RDD 都必须通过它来获得。
下面讲一讲它所不为认知的一面,它和其它组件是如何交互的。
Driver 向 Master 注册 Application 过程
SparkContext 实例化之后,在内部实例化两个很重要的类,DAGScheduler 和 TaskScheduler。
在 standalone 的模式下,TaskScheduler 的实现类是 TaskSchedulerImpl,在初始化它的时候 SparkContext 会传入一个 SparkDeploySchedulerBackend。
在 SparkDeploySchedulerBackend 的 start 方法里面启动了一个 AppClient。
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() 复制代码
maxCores 是由参数 spark.cores.max 来指定的,executorMemoy 是由 spark.executor.memory 指定的。
AppClient 启动之后就会去向 Master 注册 Applicatoin 了,后面的过程我用图来表达。
上面的图中涉及到了三方通信,具体的过程如下:
1、Driver 通过 AppClient 向 Master 发送了 RegisterApplication 消息来注册 Application,Master 收到消息之后会发送 RegisteredApplication 通知 Driver 注册成功,Driver 的接收类还是 AppClient。
2、Master 接受到 RegisterApplication 之后会触发调度过程,在资源足够的情况下会向 Woker 和 Driver 分别发送 LaunchExecutor、ExecutorAdded 消息。
3、Worker 接收到 LaunchExecutor 消息之后,会执行消息中携带的命令,执行 CoarseGrainedExecutorBackend 类 (图中仅以它继承的接口 ExecutorBackend 代替),执行完毕之后会发送 ExecutorStateChanged 消息给 Master。
4、Master 接收 ExecutorStateChanged 之后,立即发送 ExecutorUpdated 消息通知 Driver。
5、Driver 中的 AppClient 接收到 Master 发过来的 ExecutorAdded 和 ExecutorUpdated 后进行相应的处理。
6、启动之后的 CoarseGrainedExecutorBackend 会向 Driver 发送 RegisterExecutor 消息。
7、Driver 中的 SparkDeploySchedulerBackend(具体代码在 CoarseGrainedSchedulerBackend 里面)接收到 RegisterExecutor 消息,回复注册成功的消息 RegisteredExecutor 给 ExecutorBackend,并且立马准备给它发送任务。
8、CoarseGrainedExecutorBackend 接收到 RegisteredExecutor 消息之后,实例化一个 Executor 等待任务的到来。
资源的调度
好,在我们讲完了整个注册 Application 的通信过程之后,其中一个比较重要的地方是它的调度这块,它是怎么调度的?这也是我在前面为什么那么强调 maxCores 和 executorMemoy 的原因。
细心的读者如果看了第一章 《spark-submit 提交作业过程》 的就知道,其实我已经讲过调度了,因为当时不知道这个 app 是啥。但是现在我们知道 app 是啥了。代码我不就贴了,总结一下吧。
1、先调度 Driver,再调度 Application。
2、它的调度 Application 的方式是先进先出,所以就不要奇怪为什么你的 App 总得不到调度了,就像去北京的医院看病,去晚了号就没了,是一个道理。
3、Executor 的分配方式有两种,一种是倾向于把任务分散在多个节点上,一种是在尽量少的节点上运行,由参数spark.deploy.spreadOut 参数来决定的,默认是 true,把任务分散到多个节点上。
遍历所有等待的 Application,给它分配 Executor 运行的 Worker,默认分配方式如下:
1、先从 workers 当中选出内存大于 executorMemoy 的 worker,并且按照空闲 cpu 数从大到小的顺序来排序。
2、遍历 worker,从可用的 worker 分配需要的 cpu 数,每个 worker 提供一个 cpu 核心,直到 cpu 数不足或者 maxCores 分配完毕。
3、给选出来的 worker 发送任务,让它们启动 Executor,每个 Executor 占用的内存是我们设定的 executorMemoy。
资源调度的过程大体是这样的,说到这里有些童鞋在有点儿疑惑了,那我们任务调度里面 FIFO/FAIR 调度是在哪里用的?任务调度器调度的不是 Application,而是你的代码里面被解析出来的所有 Task,这在上一章当中有提到。
基于这个原因,在共用 SparkContext 的情况下,比如 Shark、JobServer 什么的,任务调度器的作用才会明显。
Driver 向 Executor 发布 Task 过程
下面我们讲一讲 Driver 向 Executor 发布 Task 过程,这在上一章是讲过的,现在把图给大家放出来了。
1、Driver 程序的代码运行到 action 操作,触发了 SparkContext 的 runJob 方法。
2、SparkContext 比较懒,转手就交给 DAGScheduler。
3、DAGScheduler 把 Job 划分 stage,然后把 stage 转化为相应的 Tasks,把 Tasks 交给 TaskScheduler。
4、通过 TaskScheduler 把 Tasks 添加到任务队列当中,转手就交给 SchedulerBackend 了。
5、调度器给 Task 分配执行 Executor,ExecutorBackend 负责执行 Task 了。
补充:ExecutorBackend 执行 Task,是通过它内部的 Executor 来执行的,Executor 内部有个线程池,new 了一个 TaskRunner 交给线程池了。
Task 状态更新
Task 执行是通过 TaskRunner 来运行,它需要通过 ExecutorBackend 和 Driver 通信,通信消息是 StatusUpdate:
1、Task 运行之前,告诉 Driver 当前 Task 的状态为 TaskState.RUNNING。
2、Task 运行之后,告诉 Driver 当前 Task 的状态为 TaskState.FINISHED,并返回计算结果。
3、如果 Task 运行过程中发生错误,告诉 Driver 当前 Task 的状态为 TaskState.FAILED,并返回错误原因。
4、如果 Task 在中途被 Kill 掉了,告诉 Driver 当前 Task 的状态为 TaskState.FAILED。
下面讲的是运行成功的状态,具体过程以文字为主。
1、Task 运行结束之后,调用 ExecutorBackend 的 statusUpdate 方法,把结果返回。结果超过 10M,就把结果保存在 blockManager 处,返回 blockId,需要的时候通过 blockId 到 blockManager 认领。
2、ExecutorBackend 直接向 Driver 发送 StatusUpdate 返回 Task 的信息。
3、Driver(这里具体指的是 SchedulerBackend)接收到 StatusUpdate 消息之后,调用 TaskScheduler 的 statusUpdate 方法,然后准备给 ExecutorBackend 发送下一批 Task。
4、TaskScheduler 通过 TaskId 找到管理这个 Task 的 TaskSetManager(负责管理一批 Task 的类),从 TaskSetManager 里面删掉这个 Task,并把 Task 插入到 TaskResultGetter(负责获取 Task 结果的类)的成功队列里。
5、TaskResultGetter 获取到结果之后,调用 TaskScheduler 的 handleSuccessfulTask 方法把结果返回。
6、TaskScheduler 调用 TaskSetManager 的 handleSuccessfulTask 方法,处理成功的 Task。
7、TaskSetManager 调用 DAGScheduler 的 taskEnded 方法,告诉 DAGScheduler 这个 Task 运行结束了,如果这个时候 Task 全部成功了,就会结束 TaskSetManager。
8、DAGScheduler 在 taskEnded 方法里触发 CompletionEvent 事件,CompletionEvent 分 ResultTask 和 ShuffleMapTask 来处理。
1)ResultTask:job 的 numFinished 加 1,如果 numFinished 等于它的分片数,则表示任务该 Stage 结束,标记这个 Stage 为结束,最后调用 JobListener(具体实现在 JobWaiter)的 taskSucceeded 方法,把结果交给 resultHandler(经过包装的自己写的那个匿名函数)处理,如果完成的 Task 数量等于总任务数,任务退出。
2)ShuffleMapTask:
(1)调用 Stage 的 addOutputLoc 方法,把结果添加到 Stage 的 outputLocs 列表里。
(2)如果该 Stage 没有等待的 Task 了,就标记该 Stage 为结束。
(3)把 Stage 的 outputLocs 注册到 MapOutputTracker 里面,留个下一个 Stage 用。
(4)如果 Stage 的 outputLocs 为空,表示它的计算失败,重新提交 Stage。
(5)找出下一个在等待并且没有父亲的 Stage 提交。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 图解源码:MyBatis 的 Mapper 原理
- 图解源码 | MyBatis的Mapper原理
- 图解源码 | 接管SpringMVC的自动配置
- 图解源码 | SpringBoot中拓展SpringMVC原理
- 图解kubernetes服务打散算法的实现源码
- 图解源码:Spring Boot 中自动配置原理
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。