内容简介:Spark Streaming--应用与实战(四)
对项目做压测与相关的优化,主要从内存(executor-memory和driver-memory)、num-executors、executor-cores,以及代码层面做一些测试和改造。
压测
-
spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 3 1000
Spark streaming 处理速度为3s一次,每次1000条
Kafka product 每秒1000条数据, 与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active Batches会越变越大.
- 调整Kafka product 每秒600条数据,存在积压,但已经不严重
- 调整Kafka product 每秒500条数据,为消费者50%,测试结果显示正常,等待时间很稳定
但是。此时每秒吞吐量为500 显然不够
-
通过调整间歇实际等,发现并没有变化
spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 2 2000 Spark streaming 处理速度为2s一次,每次2000条
Kafka product 每秒500条数据,可以看见没有在指定时间内消费完数据,照成数据积压,并发下降了
分析原因
-
分析原因,发现大部分耗时都在处理数据这样一阶段,如下图所示
调整参数
-
调整 executor-cores
–executor-cores 2 并发上升至700/s
–executor-cores 3 并发上升至750/s
-
调整executor内存,并发没有增长,无效
–executor-memory 512m
–conf spark.yarn.executor.memoryOverhead=512
-
调整am内存,并发没有增长,无效
–am-memory 512m
–conf spark.yarn.am.memoryOverhead=512
代码调整
-
发现现在主要还是在处理数据的时候消耗时间一直没有减少,而处理数据查看后发现是一条一条的往hbase里面插入的,修改为批量插入,重新构建了json.性能猛增!! 修改前的代码:
/** * * 插入数据到 HBase * * 参数( tableName , json ) ): * * Json格式: * { * "rowKey": "00000-0", * "family:qualifier": "value", * "family:qualifier": "value", * ...... * } * * @param data * @return */ def insert(data: (String, String)): Boolean = { val t: HTable = getTable(data._1) //HTable try { val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2) val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey val put = new Put(rowKey) for ((k, v) <- map) { val keys: Array[String] = k.split(":") if (keys.length == 2){ put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes) } } Try(t.put(put)).getOrElse(t.close()) true } catch { case e: Exception => e.printStackTrace() false } }
-
修改后的代码
//数据操作 messages.foreachRDD(rdd => { val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //data 处理 rdd.foreachPartition(partitionRecords => { //TaskContext 上下文 val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.debug(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}") //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition) val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是 if (either.isLeft) { logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}") } /** 解析PartitionRecords数据 */ if (offsetRange.topic != null) { HBaseDao.insert(offsetRange.topic, partitionRecords) } }) })
-
插入数据到 HBase
/** * * 插入数据到 HBase * * 参数( tableName , [( tableName , json )] ): * * Json格式: * { * "r": "00000-0", * "f": "family", * "q": [ * "qualifier", * "qualifier" * ... * ], * "v": [ * "value", * "value" * ... * ], * } * * @return */ def insert(tableName: String, array: Iterator[(String, String)]): Boolean = { try { /** 操作数据表 && 操作索引表 */ val t: HTable = getTable(tableName) //HTable val puts: util.ArrayList[Put] = new util.ArrayList[Put]() /** 遍历Json数组 */ array.foreach(json => { val jsonObj: JSONObject = JSON.parseObject(json._2) val rowKey: Array[Byte] = jsonObj.getString("r").getBytes val family: Array[Byte] = jsonObj.getString("f").getBytes val qualifiers: JSONArray = jsonObj.getJSONArray("q") val values: JSONArray = jsonObj.getJSONArray("v") val put = new Put(rowKey) for (i <- 0 until qualifiers.size()) { put.addColumn(family, qualifiers.getString(i).getBytes, values.getString(i).getBytes) } puts.add(put) }) Try(t.put(puts)).getOrElse(t.close()) true } catch { case e: Exception => e.printStackTrace() logger.error(s"insert ${tableName} error ", e) false } }
运行
- 刚测试时给它相对很小的内存跑一跑
[root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \ --master yarn-client --num-executors 1 \ --driver-memory 256m --conf spark.yarn.driver.memoryOverhead=256 \ --conf spark.yarn.am.memory=256m --conf spark.yarn.am.memoryOverhead=256 \ --executor-memory 256m --conf spark.yarn.executor.memoryOverhead=256 \ --executor-cores 1 \ --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000
-
五六万的插入没什么压力,但是到10万的时候,就有些卡顿了!!
- 当然是需要增大内存的,修改配置,都增加一倍
[root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \ --master yarn-client --num-executors 2 \ --driver-memory 512m --conf spark.yarn.driver.memoryOverhead=512 \ --conf spark.yarn.am.memory=512m --conf spark.yarn.am.memoryOverhead=512 \ --executor-memory 512m --conf spark.yarn.executor.memoryOverhead=512 \ --executor-cores 1 \ --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000
-
查看插入数据量,能看到修改后插入数据10万是没有什么压力的
-
当我们再继续加大压力测试的时候,性能下降
-
查看统计信息
1.除非注明,博文均为原创,转载请标明地址: http://blog.xiaoxiaomo.com/2017/06/10/SparkStreaming-应用与实战-四/
2.文章作者:小小默
3.发布时间:2017年06月10日 - 17时21分
4.如果本文帮到了您,不妨点一下右下角的 分享到 按钮,您的鼓励是博主写作最大的动力。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- go语言实战教程:Redis实战项目应用
- 单页应用的HATEOAS实战
- 单页应用的HATEOAS实战
- Vue应用框架整合与实战
- Spark综合使用及电商案例实战精析-Spark商业应用实战
- Spark Streaming--应用与实战(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。