Spark的运行原理

栏目: 编程工具 · 发布时间: 6年前

内容简介:博客地址:spark的运行原理在大数据开发岗面试过程中是经常被问到的一个问题,我第一次被问到这个问题的时候有点摸不着头脑,这么大的一个问题我究竟应该怎样回答呢?是去描述一下spark的架构组成还是说一下底层的调用细节?后来查找了一些资料,看了一些书之后对这个问题有了一些理解,其实提这个问题的人可能最希望我们回答的是Spark运行的过程细节,简单来说就是把某个Spark程序从提交到执行完成中间经历了哪些步骤描述出来。如果在描述的过程中能够加入一些对Spark底层源码细节的解释会给提问者留下比较好的印象,认为

博客地址: joey771.cn/2018/10/25/…

spark的运行原理在大数据开发岗面试过程中是经常被问到的一个问题,我第一次被问到这个问题的时候有点摸不着头脑,这么大的一个问题我究竟应该怎样回答呢?是去描述一下spark的架构组成还是说一下底层的调用细节?后来查找了一些资料,看了一些书之后对这个问题有了一些理解,其实提这个问题的人可能最希望我们回答的是Spark运行的过程细节,简单来说就是把某个Spark程序从提交到执行完成中间经历了哪些步骤描述出来。如果在描述的过程中能够加入一些对Spark底层源码细节的解释会给提问者留下比较好的印象,认为你不仅仅是停留在使用Spark上,还对底层源码的原理有所了解。

简单描述Spark的运行原理

用户使用spark-submit提交一个作业之后,会首先启动一个driver进程,driver进程会向集群管理器(standalone、YARN、Mesos)申请本次运行所需要的资源(这里的资源包括core和memory,可以在spark-submit的参数中进行设置),集群管理器会根据我们需要的参数在各个节点上启动executor。申请到对应资源之后,driver进程就会开始调度和执行我们编写的作业代码。作业会被提交给DAGScheduler,DAGScheduler会根据作业中RDD的依赖关系将作业拆分成多个stage,拆分的原则就是根据是否出现了宽依赖,每个stage当中都会尽可能多的包含连续的窄依赖。每个stage都包含了作业的一部分,会生成一个TaskSet提交给底层调度器TaskScheduler,TaskScheduler会把TaskSet提交到集群当中由executor进行执行。Task的划分是根据数据的partition进行划分,一个partition会划分为一个task。如此循环往复,直至执行完编写的driver程序的所有代码逻辑,并且计算完所有的数据。

简单的运行流程如下图:

Spark的运行原理

图一 spark运行流程

SparkContext

Spark程序的整个运行过程都是围绕spark driver程序展开的,spark driver程序当中最重要的一个部分就是SparkContext,SparkContext的初始化是为了准备Spark应用程序的运行环境,SparkContext主要是负责与集群进行通信、向集群管理器申请资源、任务的分配和监控等。

driver与worker之间的架构如下图,driver负责向worker分发任务,worker将处理好的结果返回给driver。

Spark的运行原理

图二 driver架构

SparkContext的核心作用是初始化Spark应用程序运行所需要的核心组件,包括高层调度器DAGScheduler、底层调度器TaskScheduler和调度器的通信终端SchedulerBackend,同时还会负责Spark程序向Master注册程序等。Spark应用当中的RDD是由SparkContext进行创建的,例如通过SparkContext.textFile()、SparkContext.parallel()等这些API。运行流程当中提及的向集群管理器Cluster Manager申请计算资源也是由SparkContext产生的对象来申请的。接下来我们从源码的角度学习一下SparkContext,关于SparkContext创建的各种组件,在SparkContext类中有这样一段代码来创建这些组件:

Spark的运行原理

DAGScheduler

DAGScheduler是一个高层调度器,能够将DAG的各个RDD划分到不同的Stage,并构建这些Stage之间的父子关系,最后将每个Stage根据partition划分为多个Task,并以TaskSet的形式提交给底层调度器TaskScheduler。Stage的划分按照的是RDD依赖关系中是否出现了宽依赖,宽依赖指的是父RDD中的一个partition被子RDD的多个partition所依赖,简单来说就是父RDD的partition的出度大于1,同理,窄依赖指的就是父RDD的一个partition只被子RDD的一个partition所依赖,也就是父RDD的partition的出度都是1。每个Stage当中都会尽可能包含多的窄依赖,将各个窄依赖的算子形成一整个pipeline进行运行,可以减少各个算子之间RDD的读写,不像MapReduce当中每个job只包含一个Map任务和一个Reduce任务,下一个Map任务都需要等待上一个Reduce任务全部都结束才能执行,pipeline形式的执行过程中没有产生shuffle,放在一起执行明显效率更高。Stage与Stage之间会出现shuffle,这里shuffle也是一个常常考察的点,另外的文章会详细说明。DAGScheduler还需要记录哪些RDD被存入磁盘等物化动作,同时要寻求Task的最优化调度,如在Stage内部数据的本地性等。DAGScheduler还需要监控因为shuffle跨节点输出可能导致的失败,如果发现这个Stage失败,可能就要重新提交该Stage。

DAGScheduler具体调用过程

当一个job被提交时,DAGScheduler便会开始其工作,spark中job的提交是由RDD的action触发的,当发生action时,RDD中的action方法会调用其SparkContext的runJob方法,经过多次重载之后会调用到DAGScheduler的runJob方法。 DAGScheduler类当中runJob是提交job的入口函数,其中会调用submitJob方法返回一个JobWaiter来等待作业调度的结果,之后根据作业的成功或者失败打印相关的结果日志信息。

Spark的运行原理 Spark的运行原理

submitJob方法会获取jobId以及校验partitions是否存在,并向eventProcessLoop发送了一个case class JobSubmitted对象,JobSubmitted对象封装了jobId、最后一个RDD,对RDD操作的函数,哪些partition需要被计算等内容。eventProcessLoop当中有一个eventThread线程,是一个deamon线程,用于接收通过post方法发送到该线程的JobSubmitted对象,放入其中的一个eventQueue阻塞队列进行处理,从eventQueue中take出来的event会调用onReceive方法(该方法由eventProcessLoop实现),onReceive方法中又会调用doOnReceive方法,按照不同的event类型进行不同的处理。 这里读源码的时候可能会有一个疑问,为何不直接在DAGScheduler调用submitJob的时候直接调用doOnReceive来处理job,为何要新启一个线程来进行处理,并且自己给自己发消息进行处理(eventProcessLoop是DAGScheduler内部的一个对象)。这里实际上是一个线程的异步通信方式,只是将消息以线程通信的方式post(这里的线程通信方式实际上是用了一个阻塞队列)给另一个线程,submitJob的方法能够立刻返回,不会阻塞在处理event的过程当中。这里我们不要浅显的认为DAGScheduler当中自己在给自己发消息,实际上还有别的组件会给DAGScheduler发消息,这种采用一个守护线程的方式进行消息处理可以将这两者统一起来,两者处理的逻辑都是一致的,扩展性非常好,使用消息循环器,就能统一处理所有的消息,保证处理的业务逻辑都是一致的。这里的eventProcessLoop实际上能够处理多种消息,不仅仅是JobSubmitted,源码当中能看到有如下多种event的处理:

  1. JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  2. MapStageSubmitted(jobId, dependency, callSite, listener, properties)
  3. StageCancelled(stageId, reason)
  4. JobCancelled(jobId, reason)
  5. JobGroupCancelled(groupId)
  6. AllJobsCancelled
  7. ExecutorAdded(execId, host)
  8. ExecutorLost(execId, reason)
  9. WorkerRemoved(workerId, host, message)
  10. BeginEvent(task, taskInfo)
  11. SpeculativeTaskSubmitted(task)
  12. GettingResultEvent(taskInfo)
  13. completion
  14. TaskSetFailed(taskSet, reason, exception)
  15. ResubmitFailedStages

JobSubmitted会去调用DAGScheduler中的handleJobSubmitted方法,该方法是构建Stage的开始阶段,会创建Stage中的最后一个Stage——ResultStage,而其他Stage为ShuffleMapStage。ResultStage的创建是由createResultStage这个函数完成的,其中的getOrCreateParentStage方法将会获取或创建一个给定RDD的父Stages列表,这个方法就是我们之前所说的具体划分Stage的方法,这个方法的源码很简单,如下:

Spark的运行原理

其中调用了一个函数getShuffleDependencies用来返回给定RDD的父节点中直接的shuffle依赖,其源码如下:

Spark的运行原理

这里有三个主要的数据结构,为两个HashSet——parents和visited,还有一个Stack——waitingForVisit,代码中首先将传入的RDD加入用于进行栈式访问的waitingForVisit中,这里使用栈我们也可以看出这是一个深度优先搜索的策略,visited用于记录访问过的节点保证不会重复访问,接下来对访问的RDD的依赖进行区分,如果是shuffleDep(即宽依赖),就将依赖加入parents,如果是dependency(窄依赖),则将依赖的RDD加入waitingForVisit进行深度优先搜索遍历,这里最终将返回parents,产生的结果就是parents当中记录的都是shuffleDep,即两个Stage之间的依赖。之后根据得到的shuffle dependency来调用getOrCreateShuffleMapStage产生ShuffleMapStage,产生的ShuffleMapStage会存储在shuffleIdToMapStage这个HashMap当中,如果在该数据结构中已经存在创建过的ShuffleMapStage就直接返回,不存在则调用createShuffleMapStage进行创建,创建的时候会调用getMissingAncestorShuffleDependencies去搜索祖先shuffle dependency,先将依赖的Stage进行创建。 Stage创建完毕之后,handleJobSubmitted将会调用submitStage来提交finalStage,submitStage将会递归优先提交父Stage,父Stage是通过getMissingParentStages来获取的,并按照Stage的id进行排序,优先提交id小的Stage。

具体例子说明

如下图所示是5个RDD的转换图,假设RDD E最后出发了一个action(比如collect),接下来按照图中的关系仔细讲解一下DAGScheduler对Stage的生成过程。

Spark的运行原理
  • RDD.collect方法会出发SparkContext.runJob方法,之后调用到DAGScheduler.runJob方法,继而调用submitJob方法将这个事件封装成JobSubmitted事件进行处理,调用到handleJobSubmitted,在这个方法中会调用createResultStage。
  • createResultStage会基于jobId创建ResultStage(ResultStage中的rdd即是出发action的那个RDD,即finalRDD)。调用getOrCreateResultStages创建所有父Stage,返回parents: List[Stage]作为父Stage,将parents传入ResultStage,实例化生成ResultStage。在示意图中即是RDD E调用createResultStage,通过getOrCreateResultStages获取Stage1、Stage2,然后创建自己的Stage3。
  • getOrCreateParentStages方法中的getShuffleDependencies会获取RDD E的所有直接款依赖集合RDD B和RDD D,然后对这两个RDD分别调用getOrCreateShuffleMapStage,由于这两个RDD都没有父Stage,则getMissingAncestorShuffleDependencies会返回为空,会创建这两个ShuffleMapStage,最后再将这两个Stage作为Stage3的父Stage,创建Stage3。
  • 之后会调用handleJobSubmitted中的submitStage来提交Stage,提交的时候采用从后往前回溯的方式,优先提交前面的Stage,并且按照Stage的id优先提交Stage的id小的,后面的Stage依赖于前面的Stage,只有前面的Stage计算完毕才会去计算后面的Stage。

SchedulerBackend和TaskScheduler

之前讲到的TaskScheduler和SchedulerBackend都只是一个trait,TaskScheduler的具体实现类是TaskSchedulerImpl,而SchedulerBackend的子类包括有:

  1. LocalSchedulerBackend
  2. StandaloneSchedulerBackend
  3. CoarseGrainedSchedulerBackend
  4. MesosCoarseGrainedSchedulerBackend
  5. YarnSchedulerBackend

不同的SchedulerBackend对应不同的Spark运行模式。传给createTaskScheduler不同的master参数就会输出不同的SchedulerBackend,在这里spark实际上是根据master传入的字符串进行正则匹配来生成不同的SchedulerBackend。这里采用了 设计模式 当中的策略模式,根据不同的需要来创建不同的SchedulerBackend的子类,如果使用的是本地模式,就会创建LocalSchedulerBackend,而standalone集群模式则会创建StandaloneSchedulerBackend。StandaloneSchedulerBackend中有一个重要的方法start,首先会调用其父类的start方法,之后定义了一个Command对象command,其中有个对象成员mainClass为org.apache.spark.executor.CoarseGrainedExecutorBackend,这个类非常重要,我们在运行spark应用时会在worker节点上看到名称为CoarseGrainedExecutorBackend的JVM进程,这里的进程就可以理解为executor进程,master发指令给worker去启动executor所有的进程时加载的Main方法所在的入口类就是这个CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中启动executor,executor通过构建线程池来并发地执行task,然后再调用它的run方法。在start方法中还会创建一个十分重要的对象StandaloneAppClient,会调用它的start方法,在该方法中会创建一个ClientEndpoint对象,这是一个RpcEndPoint,会向Master注册。

SchedulerBackend实际上是由TaskScheduler来进行管理的,createTaskScheduler方法中都会调用TaskScheduler的initialize方法将SchedulerBackend作为参数输入,绑定两者的关系。

Spark的运行原理

initialize方法当中还会创建一个Pool来初始定义资源的分布模式Scheduling Mode,默认是先进先出(FIFO)模式,还有一种支持的模式是公平(FAIR)模式。FIFO模式指的是任务谁先提交谁就先执行,后面的任务需要等待前面的任务执行,FAIR模式支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行的先后顺序。

TaskScheduler的核心任务是提交TaskSet到集群运算运算并汇报结果。我们知道,之前所讲的DAGScheduler会将任务划分成一系列的Stage,而每个Stage当中会封装一个TaskSet,这些TaskSet会按照先后顺序提交给底层调度器TaskScheduler进行执行。TaskScheduler接收的TaskSet是DAGScheduler中的submitMissingTasks方法传递过来的,具体调用的函数为TaskScheduler.submitTasks,TaskScheduler会为每一个TaskSet初始化一个TaskSetManager对其生命周期进行管理,当TaskScheduler得到Worker节点上的Executor计算资源的时候,TaskSetManager便会发送具体的Task到Executor上进行执行。如果Task执行的过程中出现失败的情况,TaskSetManager也会负责进行处理,会通知DAGScheduler结束当前的Task,并将失败的Task再次添加到待执行队列当中进行后续的再次计算,重试次数默认为4次,处理该逻辑的方法为TaskSetManager.handleFailedTask。Task执行完毕,TaskSetManager会将结果反馈给DAGScheduler进行后续处理。

TaskScheduler的具体实现类为TaskSchedulerImpl,其中有一个方法为submitTasks非常重要,该方法源码如下所示:

Spark的运行原理

该方法中会创建TaskSetManager,并通过一个HashMap将stage的id和TaskSetManager进行对应管理。之后会调用SchedulableBuilder的addTaskSetManager方法将创建的TaskSetManager加入其中,SchedulableBuilder会确定TaskSetManager的调度顺序并确定每个Task具体运行在哪个ExecutorBackend中。submitTasks方法的最后会调用backend.receiveOffer,该backend具体类型一般为CoarseGrainedSchedulerBackend,是SchedulerBackend的一个子类,其reviveOffers方法中调用的是driverEndPoint.send方法,这个方法会给DriverEndPoint发送ReceiveOffers消息,会触发底层资源调度。 driverEndPoint的receive方法匹配到ReceiveOffers消息,就调用makeOffers方法,该方法如下所示:

Spark的运行原理

该方法会获取活动的Executor,根据activeExecutor生成所有可用于计算的workOffers,workOffers在创建时会传入Executor的id,host和可用的core等信息,可用的内存信息在其他地方已经获取。makeOffers方法中还调用了scheduler的resourceOffers方法,这个方法便是给用于计算的workOffers提供资源,均匀的将任务分发给每个workOffer(Executor)进行计算。在这里我曾经有一个疑问,就是任务是不是按顺序发给每个Executor进行计算的,即假设有100个Task,5个Executor,分发任务的时候是不是总是按照0号Executor、1号Executor、2号Executor……这样的顺序进行分发的,也就是0号Executor总是拿到id为TaskId % 5的任务,1号Executor总是拿到id为TaskId % 5 + 1的任务。但阅读源码发现,其中有一个环节是进行shuffle操作,调用的是Random.shuffle(offers),即把workOffers(Executors)在Seq中的顺序进行洗牌,避免总是把任务放在同一组worker节点,这一点我们在后续的resourceOfferSingleTaskSet方法中可以很清楚的看到任务具体分发的过程其实就是按照workerOffers在Seq中的顺序进行的,在源码中就是对workerOffers的一个简单的for遍历进行读取可用的core资源并将可用的资源分发给TaskSetManager用于对应的TaskSet的计算:

Spark的运行原理

源码中按照shuffledOffers的索引进行顺序遍历,因为之前已经进行过shuffle操作,workerOffer的顺序每次都是打乱的,所以在这里分配任务时不会总是按照一定的顺序给workerOffer分配对应id号的Task,而是会以随机乱序的方式给workerOffer分配Task,但是任务的分配还会考虑任务的本地性,在分配任务时会将对应的Executor资源输入给TaskSetManager的resourceOffer方法,该方法会返回需要计算的Task的TaskDescription,这里很重要的一个依据就是尽量给Executor分配计算本地性高的任务。数据的本地优先级从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY,因此对于一些任务,其数据一直处于某个节点上,因而该任务也会一直分配给该节点上的Executor进行计算,之前对workerOffer进行打乱分配的效果可能看起来就不是特别明显了,会发现一些任务一直处于某些节点上进行计算。DAGScheduler也会有本地性的考虑,但是DAGScheduler是从数据的层面进行考虑的,从RDD的层面确定就可以,而TaskScheduler是从具体计算的角度考虑计算的本地性,是更具体的底层调度,满足数据本地性和计算本地性。

在resourceOfferSingleTaskSet方法中我们看到有一个变量CPUS_PER_TASK,之前我一直理解的是一个Task是由一个cpu core进行执行的,但是这个变量实际上来源于配置参数spark.task.cpus,当我们将这个参数设置为2时一个Task会被分配到2个core,从Stack Overflow当中了解到这个参数的设置实际上是为了满足一些特殊的Task的需求,有些Task内部可能会出现多线程的情况,或者启动额外的线程进行其他交互操作,这样设置能够确保对core资源的总需求在按照设置的情况运行时不会超过一定的设置值(但这里并没有强制要求,如果Task启动的线程数大于设置的spark.task.cpus也不会有问题,但可能会因为超过一定的值造成资源抢占,影响效率)。

进行资源分配的taskSet实际上是有一定的顺序的,在TaskSchedulerImpl.resourceOffers方法中调用了rootPool.getSortedTaskSetQueue获取按照一定规则 排序 后的taskSet进行遍历处理,这里的规则就是之前所说的FIFO或FAIR,指的是属于一个Stage的TaskSet的计算的优先级。resourceOffers函数一开始也会对每一个活着的slave进行标记,记录其主机名并跟踪是否增加了新的Executor,这里可能的情况是有一些Executor挂掉了重新启动了新的,需要在有新的TaskSet计算请求时加入到计算资源信息记录当中。

任务的资源分配好之后,会获得Task的TaskDescription,接下来CoarseGrainedSchedulerBackend调用launchTasks方法把任务发送给对应的ExecutorBackend进行执行。

Spark的运行原理

如果任务序列化之后大小超过了maxRpcMessageSize(默认128M)会丢弃,否则根据TaskDescription中记录的执行该Task的executorId获取executorData,将其中的freeCores减去运行任务需要的core数,并使用executorEndPoint的send方法发送LaunchTask给指定的Executor的ExecutorBackend进行执行,LaunchTask是一个case class,其中存储的内容就是序列化的Task。

参考文献:


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Distributed Systems

Distributed Systems

Sukumar Ghosh / Chapman and Hall/CRC / 2014-7-14 / USD 119.95

Distributed Systems: An Algorithmic Approach, Second Edition provides a balanced and straightforward treatment of the underlying theory and practical applications of distributed computing. As in the p......一起来看看 《Distributed Systems》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换