内容简介:数据量不是特别大的时候,都可以用这种MapPartitions系列操作,性能还是非常不错的,是有提升的。在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。性能肯定是有提升的。但是试了一下以后,发现,不行,OOM了,那就放弃吧。
数据量不是特别大的时候,都可以用这种MapPartitions系列操作,性能还是非常不错的,是有提升的。
在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。性能肯定是有提升的。
但是试了一下以后,发现,不行,OOM了,那就放弃吧。
filter过后使用coalesce减少分区数量
默认情况下,经过了这种filter之后,RDD中的每个partition的数据量,可能都不太一样了。(原本每个partition的数据量可能是差不多的) 这可能会导致的问题:
- 每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样数量的task,来进行处理;有点浪费task计算资源。
- 每个partition的数据量不一样,会导致后面的每个task处理每个partition的时候,每个task要处理的数据量就不同,这个时候很容易发生数据倾斜。
针对上述的两个问题,我们希望应该能够怎么样?
-
针对第一个问题,我们希望可以进行partition的压缩吧,因为数据量变少了,那么partition其实也完全可以对应的变少。比如原来是4个partition,现在完全可以变成2个partition。那么就只要用后面的2个task来处理即可。就不会造成task计算资源的浪费。(不必要,针对只有一点点数据的partition,还去启动一个task来计算)
-
针对第二个问题,其实解决方案跟第一个问题是一样的;也是去压缩partition,尽量让每个partition的数据量差不多。那么这样的话,后面的task分配到的partition的数据量也就差不多。不会造成有的task运行速度特别慢,有的task运行速度特别快。避免了数据倾斜的问题。
coalesce算子
主要就是用于在filter操作之后,针对每个partition的数据量各不相同的情况,来压缩partition的数量。减少partition的数量,而且让每个partition的数据量都尽量均匀紧凑。
从而便于后面的task进行计算操作,在某种程度上,能够一定程度的提升性能。
RDD.filter(XXX).coalesce(100); 复制代码
使用foreachPartition优化
使用repatition解决Spark SQL低并行度
前说过,并行度是自己可以调节,或者说是设置的。
1、spark.default.parallelism 2、textFile(),传入第二个参数,指定partition数量(比较少用) 复制代码
官方推荐,根据你的application的总cpu core数量(在spark-submit中可以指定,比如 200个),自己手动设置spark.default.parallelism参数,指定为cpu core总数的2~3倍。400~600个并行度。
你设置的这个并行度,在哪些情况下会生效?哪些情况下,不会生效?
如果你压根儿没有使用Spark SQL(DataFrame),那么你整个spark application默认所有stage的并行度都是你设置的那个参数。(除非你使用coalesce算子缩减过partition数量)
问题来了,如果用Spark SQL,那含有Spark SQL的那个stage的并行度,你没法自己指定。Spark SQL自己会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。你自己通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。
比如你第一个stage,用了Spark SQL从hive表中查询出了一些数据,然后做了一些transformation操作,接着做了一个shuffle操作(groupByKey);下一个stage,在shuffle操作之后,做了一些transformation操作。hive表,对应了一个hdfs文件,有20个block;你自己设置了spark.default.parallelism参数为100。
你的第一个stage的并行度,是不受你的控制的,就只有20个task;第二个stage,才会变成你自己设置的那个并行度,100。
问题在哪里?
Spark SQL默认情况下,它的那个并行度,咱们没法设置。可能导致的问题,也可能没什么问题。Spark SQL所在的那个stage中,后面的那些transformation操作,可能会有非常复杂的业务逻辑,甚至说复杂的算法。如果你的Spark SQL默认把task数量设置的很少,20个,然后每个task要处理为数不少的数据量,然后还要执行特别复杂的算法。
这个时候,就会导致第一个stage的速度,特别慢。第二个stage,刷刷刷,非常快。
如何解决
repartition算子,你用Spark SQL这一步的并行度和task数量,肯定是没有办法去改变了。但是呢,可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20个partition,分区成100个。
然后呢,从repartition以后的RDD,再往后,并行度和task数量,就会按照你预期的来了。就可以避免跟Spark SQL绑定在一个stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑。
return dataDF.javaRDD().repartition(1000); 复制代码
以上所述就是小编给大家介绍的《Spark学习——性能调优(三)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- HTTPS 性能优化学习笔记
- Spark学习——性能调优
- 浅谈机器学习模型推理性能优化
- 【火炉炼AI】机器学习024-无监督学习模型的性能评估--轮廓系数
- 【火炉炼AI】深度学习009-用Keras迁移学习提升性能(多分类问题)
- 通过vmstat学习CPU和进程性能监控
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。