Spark Streaming--应用与实战(二)

栏目: 编程工具 · 发布时间: 7年前

内容简介:Spark Streaming--应用与实战(二)
  • 总体思路就是:
  1. put数据构造json数据,写入kafka;
  2. spark streaming任务启动后首先去zookeeper中去读取offset,组装成fromOffsets;
  3. spark streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
  4. 读取kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
  5. 写入数据到HBase
    Spark Streaming--应用与实战(二)

初始化与配置加载

  • 下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
    //接收参数
    val Array(kafka_topic, timeWindow, maxRatePerPartition) = args
    
    //加载配置
    val prop: Properties = new Properties()
    prop.load(this.getClass().getResourceAsStream("/kafka.properties"))
    
    val groupName = prop.getProperty("group.id")
    
    //获取配置文件中的topic
    val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic)
    if (kafkaTopics == null || kafkaTopics.length <= 0) {
        System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties")
        System.exit(1)
    }
    
    val topics: Set[String] = kafkaTopics.split(",").toSet
    
    val kafkaParams = scala.collection.immutable.Map[String, String](
        "metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
        "group.id" -> groupName,
        "auto.offset.reset" -> "largest")
    
    val kc = new KafkaCluster(kafkaParams)
    
    //初始化配置
    val sparkConf = new SparkConf()
            .setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString())
            .set("spark.yarn.am.memory", prop.getProperty("am.memory"))
            .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead"))
            .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead"))
            .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .set("spark.reducer.maxSizeInFlight", "1m")
    
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求
    

只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public 即可。

链接ZK

  • 注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下
    //zk
    val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
    

组装fromOffsets

  • 组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费
    var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset
    
    
    //支持多个topic : Set[String]
    topics.foreach(topicName => {
    
        //去brokers中获取partition数量,注意:新增partition后需要重启
        val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName))
        for (i <- 0 until children) {
    
            //kafka consumer 中是否有该partition的消费记录,如果没有设置为0
            val tp = TopicAndPartition(topicName, i)
            val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i"
            if (zkClient.exists(path)) {
                fromOffsets += (tp -> zkClient.readData[String](path).toLong)
            } else {
                fromOffsets += (tp -> 0)
            }
        }
    })
    

通过createDirectStream接受数据

  • 使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录
    //创建Kafka持续读取流,通过zk中记录的offset
    val messages: InputDStream[(String, String)] =
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    

入库

  • 入库HBase

    //数据操作
            messages.foreachRDD(rdd => {
                val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
                //data 处理
                rdd.foreachPartition(partitionRecords => {
                    //TaskContext 上下文
                    val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
                    logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
    
                    //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
                    val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
                    val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
                    if (either.isLeft) {
                        logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
                    }
    
                    partitionRecords.foreach(data => {
                        HBaseDao.insert(data)
                    })
                })
    
            })
    
  • 插入数据到具体HBase数据库

    /**
      * 
      * 插入数据到 HBase
      *
      * 参数( tableName ,  json ) ):
      * 
      * Json格式:
      *     {
      *         "rowKey": "00000-0",
      *         "family:qualifier": "value",
      *         "family:qualifier": "value",
      *         ......
      *     }
      *
      * @param data
      * @return
      */
    def insert(data: (String, String)): Boolean = {
    
        val t: HTable = getTable(data._1) //HTable
        try {
            val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
            val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
            val put = new Put(rowKey)
    
            for ((k, v) <- map) {
                val keys: Array[String] = k.split(":")
                if (keys.length == 2){
                    put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
                }
            }
    
            Try(t.put(put)).getOrElse(t.close())
            true
        } catch {
            case e: Exception =>
                e.printStackTrace()
                false
        }
    }
    

运行并查看结果

  • 运行命令:

    /opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000

    运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下篇博客

    Spark Streaming--应用与实战(二) Spark Streaming--应用与实战(二)

1.除非注明,博文均为原创,转载请标明地址: http://blog.xiaoxiaomo.com/2017/06/10/SparkStreaming-应用与实战-二/

2.文章作者:小小默

3.发布时间:2017年06月10日 - 16时56分

4.如果本文帮到了您,不妨点一下右下角的 分享到 按钮,您的鼓励是博主写作最大的动力。


以上所述就是小编给大家介绍的《Spark Streaming--应用与实战(二)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

逆向工程权威指南

逆向工程权威指南

Dennis Yurichev(丹尼斯) / 安天安全研究与应急处理中心 / 人民邮电出版社 / 2017-3-1 / 168

逆向工程是一种分析目标系统的过程,旨在于识别系统的各组件以及组件间关系,以便于通过其它形式、或在较高的抽象层次上,重建系统的表征。 本书专注于软件的逆向工程,是写给初学者的一本经典指南。全书共分为12个部分,共102章,涉及X86/X64、ARM/ARM-64、MIPS、Java/JVM等重要话题,详细解析了Oracle RDBMS、Itanium、软件狗、LD_PRELOAD、栈溢出、EL......一起来看看 《逆向工程权威指南》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具