内容简介:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何商业交流,可随时联系。调用rdd.cache()来缓存数据,这样也可以加快数据的处理,但是我们需要更多的内存资源建议用并行Mark-Sweep垃圾回收机制,虽然它消耗更多的资源,但是我们还是建议开启。 在spark-submit中使用
本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何商业交流,可随时联系。
1 开门见山
1.1 优化1:进行HA机制处理-针对updateStateByKey与window等有状态的操作
-
HA高可用性:High Availability,如果有些数据丢失,或者节点挂掉;如果不想让你的实时计算程序挂了,就必须做一些数据上的冗余副本,保证你的实时计算程序可以7 * 24小时的运转。
-
针对updateStateByKey与window等有状态的操作,自动进行checkpoint,必须设置checkpoint目录。
HDFS:SparkStreaming.checkpoint("hdfs://192.168.1.105:9090/checkpoint"), 复制代码
-
checkpoint 会把数据保留一份在容错的文件系统中,一旦内存中的数据丢失掉;就可以直接从文 件系统中读取数据,不需要重新进行计算。
JavaStreamingContext jssc = new JavaStreamingContext( conf, Durations.seconds(5)); jssc.checkpoint("hdfs://192.168.1.105:9000/streaming_checkpoint"); 复制代码
1.2 优化2:进行HA机制处理-针对Driver高可用性
-
在创建和启动StreamingContext的时候,将元数据写入容错的文件系统(比如hdfs)。保证在driver挂掉之后,spark集群可以自己将driver重新启动起来;而且driver在启动的时候,不会重新创建一个streaming context,而是从容错文件系统(比如hdfs)中读取之前的元数据信息,包括job的执行进度,继续接着之前的进度,继续执行。
-
使用这种机制,就必须使用cluster模式提交,确保driver运行在某个worker上面;
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); JavaDStream<String> lines = jssc.socketTextStream(...); jssc.checkpoint(checkpointDirectory); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); context.start(); context.awaitTermination(); 复制代码
-
提交方式
spark-submit --deploy-mode cluster --supervise 复制代码
1.3 优化3:实现RDD高可用性:启动WAL预写日志机制
-
spark streaming,从原理上来说,是通过receiver来进行数据接收的;接收到的数据,会被划分成一个一个的block;block会被组合成一个batch;针对一个batch,会创建一个rdd;
-
receiver接收到数据后,就会立即将数据写入一份到容错文件系统(比如hdfs)上的checkpoint目录中的,另一份写入到磁盘文件中去;作为数据的冗余副本。无论你的程序怎么挂掉,或者是数据丢失,那么数据都不肯能会永久性的丢失;因为肯定有副本。
SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("StreamingSpark"); .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); .set("spark.default.parallelism", "1000"); .set("spark.streaming.blockInterval", "50"); .set("spark.streaming.receiver.writeAheadLog.enable", "true"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); jssc.checkpoint("hdfs://192.168.1.164:9000/checkpoint"); 复制代码
1.4 优化4:InputDStream并行化数据接收
-
创建多个InputDStream来接收同一数据源
-
把多个topic数据细化为单一的kafkaStream来接收
1:创建kafkaStream Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092"); kafkaParams.put("zookeeper.connect","master:2181,data1:2181,data2:2181"); 构建topic set String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS); String[] kafkaTopicsSplited = kafkaTopics.split(","); Set<String> topics = new HashSet<String>(); for(String kafkaTopic : kafkaTopicsSplited) { topics.add(kafkaTopic); JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 2:InputDStream并行化数据接收 int numStreams = 5; List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String,String>>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(...)); } JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); unifiedStream.print(); 复制代码
1.5 优化5:增加block数量,增加每个batch rdd的partition数量,增加处理并行度
- 第一步:receiver从数据源源源不断地获取到数据,首先是会按照block interval,将指定时间间隔的数据,收集为一个block;默认时间是200ms,官方推荐不要小于50ms;
- 第二步:根据指定batch interval时间间隔合并为一个batch,创建为一个rdd,
- 第三步:启动一个job,去处理这个batch rdd中的数据。
- 第四步:batch rdd 的partition数量是多少呢?一个batch有多少个block,就有多少个partition;就意味着并行度是多少;就意味着每个batch rdd有多少个task会并行计算和处理。
- 调优:如果希望可以比默认的task数量和并行度再多一些,可以手动调节blockinterval,减少block interval。每个batch可以包含更多的block。因此也就有更多的partition,因此就会有更多的task并行处理每个batch rdd。
1.6 优化6:重分区,增加每个batch rdd的partition数量
- inputStream.repartition():重分区,增加每个batch rdd的partition数量 对dstream中的rdd进行重分区为指定数量的分区,就可以提高指定dstream的rdd的计算并行度
- 调节并行度具体细节:
1.7 优化7:提升并行度
方法1: spark.default.parallelism 方法2: reduceByKey(numPartitions) 复制代码
JavaPairDStream<String, Long> dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); }, 1000); 复制代码
1.8 优化8:使用Kryo序列化机制
-
spark streaming:提高序列化task发送到executor上执行的性能,比如:task很多的时候,task序列化和反序列化的性能开销也比较可观
-
默认输入数据的存储级别是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到数据,默认就会进行持久化操作;首先序列化数据,存储到内存中;如果内存资源不够大,那么就写入磁盘;而且,还会写一份冗余副本到其他executor的block manager中,进行数据冗余。
SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("StreamingSpark"); .set("spark.serializer", "org.apache.spar.serializer.KryoSerializer"); <= 优化点 .set("spark.default.parallelism", "1000"); .set("spark.streaming.blockInterval", "50"); .set("spark.streaming.receiver.writeAheadLog.enable", "true"); 复制代码
1.9 优化9:batch interval:流式处理时间必须小于batch interval
-
batch interval,就是指每隔多少时间收集一次数据源中的数据,然后进行处理,因此切忌处理时间过长导致的batch interval。
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); 复制代码
1.10 优化10:缓存需要经常使用的数据
调用rdd.cache()来缓存数据,这样也可以加快数据的处理,但是我们需要更多的内存资源
1.11 优化11:定时清除不需要的数据
- 通过配置spark.cleaner.ttl为一个合理的值,但是这个值不能过小,因为如果后面计算需要用的数据被清除会带来不必要的麻烦。
- 另外通过配置spark.streaming.unpersist为true(默认就是true)来更智能地去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
1.12 优化12:GC优化策略(暂时不确定)
建议用并行Mark-Sweep垃圾回收机制,虽然它消耗更多的资源,但是我们还是建议开启。 在spark-submit中使用
--driver-java-options "-XX:+UseConcMarkSweepGC" --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" 复制代码
1.13 优化13:去除压缩
在内存充足的情况下,可以设置spark.rdd.compress 设置为false.
1.14 优化14:Executors和cpu核心数设置和Spark On Yarn 动态资源分配
-
首先需要对YARN的NodeManager进行配置,使其支持Spark的Shuffle Service。
##修改 <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,spark_shuffle</value> </property> ##增加 <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property> 复制代码
-
将spark中对应jar包拷贝到hadoop的目录下:
首先找到spark版本的spark-<version>-yarn-shuffle.jar shuffle包,并将该包放到集群所有NodeManager的classpath下, 比如放到HADOOP_HOME/share/hadoop/yarn/lib 复制代码
-
重启所有NodeManager。
-
配置示例如下:
spark.shuffle.service.enabled=true spark.dynamicAllocation.enabled=true spark.dynamicAllocation.executorIdleTimeout=60s spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=5 spark.dynamicAllocation.minExecutors=0 spark-submit \ --master yarn \ --deploy-mode cluster \ --executor-cores 3 \ --executor-memory 10G \ --driver-memory 4G \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.initialExecutors=5 \ --conf spark.dynamicAllocation.maxExecutors=40 \ --conf spark.dynamicAllocation.minExecutors=0 \ --conf spark.dynamicAllocation.executorIdleTimeout=30s \ --conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \ 复制代码
1.15 优化15:使用高性能算子
使用reduceByKey/aggregateByKey替代groupByKey 使用mapPartitions替代普通map 使用foreachPartitions替代foreach 使用filter之后进行coalesce操作 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- CoreOS 实战:剖析 etcd
- CoreOS 实战:剖析 etcd
- 别人家的 InfluxDB 实战 + 源码剖析
- OLAP技术之Kylin官方案例详细剖析-商业环境实战
- Kylin官方案例详细剖析及剪枝优化-OLAP商业环境实战
- Elasticsearch生产环境索引管理深入剖析-搜索系统线上实战
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java核心技术·卷1:基础知识(原书第9版)
(美)Cay S. Horstmann、(美)Gary Cornell / 周立新、陈波、叶乃文、邝劲筠、杜永萍 / 机械工业出版社 / 2013-11-1 / 119.00
Java领域最有影响力和价值的著作之一,拥有20多年教学与研究经验的资深Java技术专家撰写(获Jolt大奖),与《Java编程思想》齐名,10余年全球畅销不衰,广受好评。第9版根据JavaSE7全面更新,同时修正了第8版中的不足,系统全面讲解Java语言的核心概念、语法、重要特性和开发方法,包含大量案例,实践性强。 《Java核心技术·卷1:基础知识》共14章。第1章概述了Java语言与其......一起来看看 《Java核心技术·卷1:基础知识(原书第9版)》 这本书的介绍吧!