内容简介:在开始写spark代码或者翻阅spark文档的时候,会遇到一些诸如“transformation”,“action”和“RDD”这样的术语。了解这些术语对于编写spark代码是至关重要的。类似的,当写的spark程序执行失败的时候或者尝试通过Spark WebUI来研究提交的应用为什么执行时间过长的时候,可能会遇到另外一些术语,如“job”、“stage”和“task”,了解这些术语对于写出“好”的Spark程序至关重要。对于这里的“好”,我的意思是执行得足够快。要写出高效的Spark程序,了解一些Spa
在开始写spark代码或者翻阅spark文档的时候,会遇到一些诸如“transformation”,“action”和“RDD”这样的术语。了解这些术语对于编写spark代码是至关重要的。
类似的,当写的spark程序执行失败的时候或者尝试通过Spark WebUI来研究提交的应用为什么执行时间过长的时候,可能会遇到另外一些术语,如“job”、“stage”和“task”,了解这些术语对于写出“好”的Spark程序至关重要。对于这里的“好”,我的意思是执行得足够快。要写出高效的Spark程序,了解一些Spark的底层执行模式是非常非常重要的。
在这篇文里,会介绍部分诸如“Spark程序是如何在集群上执行的”这样的基础知识。通过了解Spark的执行模式,读者也会学到一些如何写出高效Spark程序的实用建议。
Spark是如何执行你的程序的
一个Spark应用是由一个Driver进程和一系列分散在集群不同节点上的Executor进程共同组成的。
Driver进程负责所有要执行的工作的上层流程控制。
Executor进程负责执行具体的工作(以 Task 的形式)以及存储用户选择的要缓存的数据。
在应用运行期间,Driver和Executpr通常都是固定不变的,不过 Spark动态资源管理 会改变后者。每一个Executor都有多个slot来运行Task,并在其生命周期内并发执行。
将Driver进程和Executor进程部署到集群是由集群管理器(Yarn、Mesos或者单节点Spark)来完成的,但是Driver和Executor是保存在每个Spark应用中的。
在Spark的执行模式中,位于最上层的概念是 Job 。在一个Spark应用中调用Action算子就会触发开始一个Spark Job来完成这个Action的运算。Spark会检查这个Action算子所依赖的RDD关系图来获取Job的全貌,并形成一个执行计划。该执行计划从最后端的RDD(即不依赖已缓存的数据或其它RDD的RDD)开始,最终生成计算Action结果所需的RDD。
执行计划的一个主要内容是将Job的transformation算子组合为一个或多个stage。一个stage对应着一批执行相同代码的Task集合;每个Task处理不同的数据子集。每个stage中的transformation算子都是不需要shuffle全部数据就可以完成的。
是什么决定了数据是否要被shuffle呢?
回忆一下,每个RDD是由固定数量的partition组成的;每个partition又是由一定数量的record组成的。对于那些由被称为 “窄” transformation(如map或filter)返回的RDD,计算单个partition中的record所需的record都保存在父RDD的单个partition中。每个对象都只会依赖父RDD的单个对象。coalesce之类的操作可以在一个task中处理多个partition中的数据,但是这种transformation依然被视为 “窄” transformation,因为用来计算每个输出record的输入record仍然在有限的partition子集中。
不过,spark也支持诸如groupByKey和reduceByKey这样的宽依赖transformation。在这些宽依赖操作中,用来计算单个partition的record所需的数据可能来自父RDD的多个partiton中。“byKey”操作通过同样的task将拥有相同key的元组最终放到了同一个partition中。为了完成这种操作,spark就需要执行一次shuffle,这样在一个新的stage中,收集分散在集群上的数据,最终创建新的partition集合。
举个例子,可以考虑下如下的代码:
sc.textFile("someFile.txt"). map(mapFunc). flatMap(flatMapFunc). filter(filterFunc). count()
这段代码中执行了一个action算子。这个action算子又依赖于一系列transformation算子。代码中的action算子和transformation算子执行于由一个文本文件生成的RDD上。这段代码会在一个stage中执行完成,因为涉及到的三个transformation操作没有一个需要的数据是来自输入RDD的多个分区中的。
再看一个相反的示例,下面的代码统计了一个文本文件中每个出现次数超过1000次的单词的具体出现次数。
val tokenized = sc.textFile(args(0)).flatMap(_.split(' ')) val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) val filtered = wordCounts.filter(_._2 >= 1000) val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)). reduceByKey(_ + _) charCounts.collect()
代码中的整个处理过程被分成了三个stage。其中的两个reduceByKey操作可以视为stage的分界点,因为计算reduceByKey的输出需要对数据按key进行重新分区。
下图是一个更复杂的transformation执行图。其中的join操作有着多个依赖。
下图用粉色方框框起来的表示执行过程中的不同stage。
在每个stage的边界,数据会被父stage的task写到硬盘上,而后又会被子stage的task通过网络获取到。因为导致了沉重的硬盘和网络I/O开销,stage边界操作代价是昂贵的,应当尽量避免。父stage的partition数目和子stage的partition数目有可能是不一致的。通常,会触发stage边界的transformation算子都会有一个类似“numPartitions”这样的参数来决定将子stage的数据分为多少个partition。
选择正确的操作
当尝试用spark处理问题时,我们有许多的方式来将action和transformation算子进行组合计算来得到同一个正确的结果。但是并不是所有的这些组合在性能上都是一致的:避免一些常见的陷阱、采用正确的算子组合通常对应用的性能有着显著地影响。接下来介绍的一些规则和经验,可以在必要的时候帮你做出选择。
要从多个可行的算子组合中选出最优方案来,最基本的一个标准是这种算子组合能够减少shuffle的次数和shuffle的数据总量。这是因为shuffle是一个相当耗费资源的操作:所有要shuffle的数据会先写到硬盘上而后再通过网络进行传输。repartition、join、cogroup以及类似“*By”或“*ByKey”这样的transformation算子都会导致shuffle。并非所有的操作都是等价的。Spark开发新手遇到的一些性能陷阱通常都是因为选择了错误的操作导致的。
下面介绍一些应该避免的操作。
在执行关联缩减操作时避免使用groupByKey操作
举个例子,如这段代码:
rdd.groupByKey().mapValues(_.sum)
这段代码执行效果等同于直接使用reduceByKey,如下:
rdd.reduceByKey(_ + _)
两段代码执行效果虽然相同,但是使用groupByKey的代码会在集群内传递所有的数据集。而后者会先在每个partition内按key进行局部计算,而后再执行shuffle将局部结果集合并起来形成最终结果。
当输入输出值类型不同时避免使用reduceByKey操作
举例如:写一个transformation计算出相同key的字符串集合。一个写法就是使用map将原始的key/value对中的value转为单元素的Set集合,然后再使用reduceByKey将Set集合组合起来:
rdd.map(kv => (kv._1, new Set[String]() + kv._2)) .reduceByKey(_ ++ _)
这段代码会创建太多的不必要的对象,因为需要为每个原始key/value对创建一个Set对象。更好地做法是使用aggregateBykey,这个方法可以更高效地执行map-side聚合:
val zero = new collection.mutable.Set[String]() rdd.aggregateByKey(zero)( (set, v) => set += v, (set1, set2) => set1 ++= set2)
避免使用“flatMap-join-groupBy”模式
当两个数据集已经按key做了group,此时想在这两个数据集保持group的状态下进行join操作,可以使用cogroup方法,这样可以避免拆开group又重新执行group造成的开销。
什么时候不会发生shuffle呢
知道在什么样的场景下,上面的transformation操作不会产生shuffle也是很重要的。在之前已经有一个transformation操作使用相同的partitioner完成分区的情况下,spark知道如何避免重复shuffle。看一下下面的代码:
rdd1 = someRdd.reduceByKey(...) rdd2 = someOtherRdd.reduceByKey(...) rdd3 = rdd1.join(rdd2)
因为没有将partitioner参数传递给reduceByKey方法,所以会使用默认的partitioner,也就是说rdd1和rdd2都会使用HashPartitioner按hash进行分区。这两个reduceByKey操作将会产生两次shuffle。如果这两个RDD有相同数目的partition,那么在执行join操作的时候将不会需要额外的shuffle。因为这两个RDD是按相同规则进行分区的,rdd1的任何一个数据集都只会出现在rdd2的单个partiton中。因此,rdd3的任何一个输出partiton的数据都只会依赖rdd1和rdd2的单个partition的数据,不需要再执行第三次shuffle了。
举个例子,如果前面代码中的“someRdd”有四个partition,“someOtherRdd”有两个partition,这两者执行的reduceByKey操作都会用到三个partition,那么这个任务的执行示意图就大致如下:
如果rdd1和rdd2使用不同的partitioner,又或者它们都仍然使用默认的HashPartitioner却设置了不同的分区数,那么又是怎么执行的呢?在这种情况下,只有一个RDD(partition数目较少的那个RDD)会需要在执行join操作的时候重新shuffle。
一样的transformation操作、一样的输入数据,设置了不同的partition数目时的执行示意图:
对两个数据集执行join操作时,要想避免shuffle,可以考虑利用broadcast变量。当其中一个数据集小到可以放进Executor的执行内存中时,可以先在driver中将这个数据集放进一个hash表中然后在将之广播给每个Executor。之后的map操作就可以参考这个hash表来进行查询了。
什么时候shuffle越多越好
通常我们都需要尽量减少shuffle的数量,但是有一些特殊的场景我们需要反其道而行之。增加shuffle可以增加任务的并行度,这有助于提升性能。假设原始数据来自于几个比较大的不可分割的文件中,那么InputFormat使用的分区方式会将大量的数据放到每个partition中,而不是产生足够多的partition来充分利用可用的core。这种场景下,加载完数据后,执行一次repartition来增加partition的数量(此时会产生shuffle)将有助于之后的操作充分利用集群的CPU。
另外一个场景是在使用reduce或aggregate操作将数据聚合到driver上时。当partition太多时,在Driver上使用单线程将所有partition的数据聚合到一起时,很容易遇到瓶颈。要降低Driver的负载可以考虑先使用reduceByKey或aggregateByKey展开一轮分布式地聚集运算,将原始数据集的partition总数缩小。在将最后的结果发送到driver之前,每个partition上的数据都会互相按key进行合并。具体的,可以看一下treeReduce和treeAggregate这两个方法是如何实现的。
在原始数据已经按key做了分组之后,这个技巧将会特别地有效。假设我们要写一个应用统计一个文本文件内每个单词出现的次数,然后将结果汇总到driver上。一个方法是使用action算子aggregate在本地对每个partition进行运算,然后将将结果汇总到Driver。另一种方式使用aggregateByKey算子,以一种完全分布式的形式进行运算,最后再调用collectAsMap将结果汇总到driver上。
二次排序
另一个需要注意的重要功能是 repartitionAndSortWithinPartitions 这个transformation算子。这个算子看起来有点儿神秘,但是好像会在各种有趣的场景下出现。它将 排序 推到了执行shuffle的机器上。在那里,大量的数据被高效地溢写,并且排序可以和其他操作组合在一起使用。
举个例子,Apache Hive在Spark上的join方法实现就使用了这个transformation算子。此外,在 二次排序 模式中它也是一个重要的组成模块——在这种模式下数据按key进行分组,同时在按key遍历值的时候,key对应的值需要保持一定的顺序。这种模式的使用场景通常为将事件日志按用户进行分组,并将每个用户的事件按发生时间排序后再进行分析。直接使用 repartitionAndSortWithinPartitions 执行二次排序对用户有点儿难度,不过 SPARK-3655 大大简化了这个问题。
结论
现在应该能够对涉及到创建一个性能高效的Spark程序的基本因素有足够的了解了。在下一节我们将继续介绍资源请求、并行度和数据结构相关的调优内容。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。