Spark Streaming--应用与实战(四)

栏目: 编程工具 · 发布时间: 7年前

内容简介:Spark Streaming--应用与实战(四)

对项目做压测与相关的优化,主要从内存(executor-memory和driver-memory)、num-executors、executor-cores,以及代码层面做一些测试和改造。

压测

  1. spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 3 1000

    Spark streaming 处理速度为3s一次,每次1000条

    Kafka product 每秒1000条数据, 与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active Batches会越变越大.

  • 调整Kafka product 每秒600条数据,存在积压,但已经不严重
    Spark Streaming--应用与实战(四)
  • 调整Kafka product 每秒500条数据,为消费者50%,测试结果显示正常,等待时间很稳定
    Spark Streaming--应用与实战(四)

但是。此时每秒吞吐量为500 显然不够

  1. 通过调整间歇实际等,发现并没有变化

    spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 2 2000 Spark streaming 处理速度为2s一次,每次2000条

    Kafka product 每秒500条数据,可以看见没有在指定时间内消费完数据,照成数据积压,并发下降了

    Spark Streaming--应用与实战(四)

分析原因

  • 分析原因,发现大部分耗时都在处理数据这样一阶段,如下图所示

    Spark Streaming--应用与实战(四)

调整参数

  • 调整 executor-cores

    –executor-cores 2 并发上升至700/s

    –executor-cores 3 并发上升至750/s

    Spark Streaming--应用与实战(四)
  • 调整executor内存,并发没有增长,无效

    –executor-memory 512m

    –conf spark.yarn.executor.memoryOverhead=512

  • 调整am内存,并发没有增长,无效

    –am-memory 512m

    –conf spark.yarn.am.memoryOverhead=512

代码调整

  • 发现现在主要还是在处理数据的时候消耗时间一直没有减少,而处理数据查看后发现是一条一条的往hbase里面插入的,修改为批量插入,重新构建了json.性能猛增!! 修改前的代码:

    /**
      * 
      * 插入数据到 HBase
      *
      * 参数( tableName ,  json ) ):
      * 
      * Json格式:
      *     {
      *         "rowKey": "00000-0",
      *         "family:qualifier": "value",
      *         "family:qualifier": "value",
      *         ......
      *     }
      *
      * @param data
      * @return
      */
    def insert(data: (String, String)): Boolean = {
        val t: HTable = getTable(data._1) //HTable
        try {
            val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
            val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
            val put = new Put(rowKey)
            for ((k, v) <- map) {
                val keys: Array[String] = k.split(":")
                if (keys.length == 2){
                    put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
                }
            }
            Try(t.put(put)).getOrElse(t.close())
            true
        } catch {
            case e: Exception =>
                e.printStackTrace()
                false
        }
    }
    
  • 修改后的代码

    //数据操作
    messages.foreachRDD(rdd => {
        val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //data 处理
        rdd.foreachPartition(partitionRecords => {
            //TaskContext 上下文
            val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
            logger.debug(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
            //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
            val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
            val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
            if (either.isLeft) {
                logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
            }
            /** 解析PartitionRecords数据 */
            if (offsetRange.topic != null) {
                HBaseDao.insert(offsetRange.topic, partitionRecords)
            }
        })
    })
    
  • 插入数据到 HBase

    /**
      *
      * 插入数据到 HBase
      *
      * 参数( tableName , [( tableName , json )] ):
      * 
      * Json格式:
      *     {
      *         "r": "00000-0",
      *         "f": "family",
      *         "q": [
      *             "qualifier",
      *             "qualifier"
      *             ...
      *          ],
      *         "v": [
      *             "value",
      *             "value"
      *             ...
      *         ],
      *     }
      *
      * @return
      */
    def insert(tableName: String, array: Iterator[(String, String)]): Boolean = {
        try {
            /** 操作数据表 && 操作索引表 */
            val t: HTable = getTable(tableName) //HTable
            val puts: util.ArrayList[Put] = new util.ArrayList[Put]()
            /** 遍历Json数组 */
            array.foreach(json => {
                val jsonObj: JSONObject = JSON.parseObject(json._2)
                val rowKey: Array[Byte] = jsonObj.getString("r").getBytes
                val family: Array[Byte] = jsonObj.getString("f").getBytes
                val qualifiers: JSONArray = jsonObj.getJSONArray("q")
                val values: JSONArray = jsonObj.getJSONArray("v")
                val put = new Put(rowKey)
                for (i <- 0 until qualifiers.size()) {
                    put.addColumn(family, qualifiers.getString(i).getBytes, values.getString(i).getBytes)
                }
                puts.add(put)
            })
            Try(t.put(puts)).getOrElse(t.close())
            true
        } catch {
            case e: Exception =>
                e.printStackTrace()
                logger.error(s"insert ${tableName} error ", e)
                false
        }
    }
    

运行

  1. 刚测试时给它相对很小的内存跑一跑
     [root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \
    --master yarn-client --num-executors 1 \
    --driver-memory 256m --conf spark.yarn.driver.memoryOverhead=256 \
    --conf spark.yarn.am.memory=256m --conf spark.yarn.am.memoryOverhead=256  \
    --executor-memory 256m --conf spark.yarn.executor.memoryOverhead=256  \
    --executor-cores 1  \
    --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000
    
  • 五六万的插入没什么压力,但是到10万的时候,就有些卡顿了!!

    Spark Streaming--应用与实战(四) Spark Streaming--应用与实战(四)
  1. 当然是需要增大内存的,修改配置,都增加一倍
     [root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \
    --master yarn-client --num-executors 2 \
    --driver-memory 512m --conf spark.yarn.driver.memoryOverhead=512 \
    --conf spark.yarn.am.memory=512m --conf spark.yarn.am.memoryOverhead=512 \
    --executor-memory 512m --conf spark.yarn.executor.memoryOverhead=512 \
    --executor-cores 1  \
    --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000
    
  • Spark Streaming--应用与实战(四)
  • Spark Streaming--应用与实战(四)
  • 查看插入数据量,能看到修改后插入数据10万是没有什么压力的

    Spark Streaming--应用与实战(四)
  • 当我们再继续加大压力测试的时候,性能下降

    Spark Streaming--应用与实战(四)
  • 查看统计信息

    Spark Streaming--应用与实战(四)

1.除非注明,博文均为原创,转载请标明地址: http://blog.xiaoxiaomo.com/2017/06/10/SparkStreaming-应用与实战-四/

2.文章作者:小小默

3.发布时间:2017年06月10日 - 17时21分

4.如果本文帮到了您,不妨点一下右下角的 分享到 按钮,您的鼓励是博主写作最大的动力。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

大数据系统构建

大数据系统构建

Nathan Marz、James Warren / 马延辉、向磊、魏东琦 / 机械工业出版社 / 2017-1 / 79.00

随着社交网络、网络分析和智能型电子商务的兴起,传统的数据库系统显然已无法满足海量数据的管理需求。 作为一种新的处理模式,大数据系统应运而生,它使用多台机器并行工作,能够对海量数据进行存储、处理、分析,进而帮助用户从中提取对优化流程、实现高增长率的有用信息,做更为精准有效的决策。 但不可忽略的是,它也引入了大多数开发者并不熟悉的、困扰传统架构的复杂性问题。 本书将教你充分利用集群硬件优势的La......一起来看看 《大数据系统构建》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具