内容简介:Spark Streaming--应用与实战(二)
- 总体思路就是:
- put数据构造json数据,写入kafka;
- spark streaming任务启动后首先去zookeeper中去读取offset,组装成fromOffsets;
- spark streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
- 读取kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
- 写入数据到HBase
初始化与配置加载
- 下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
//接收参数 val Array(kafka_topic, timeWindow, maxRatePerPartition) = args //加载配置 val prop: Properties = new Properties() prop.load(this.getClass().getResourceAsStream("/kafka.properties")) val groupName = prop.getProperty("group.id") //获取配置文件中的topic val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic) if (kafkaTopics == null || kafkaTopics.length <= 0) { System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties") System.exit(1) } val topics: Set[String] = kafkaTopics.split(",").toSet val kafkaParams = scala.collection.immutable.Map[String, String]( "metadata.broker.list" -> prop.getProperty("bootstrap.servers"), "group.id" -> groupName, "auto.offset.reset" -> "largest") val kc = new KafkaCluster(kafkaParams) //初始化配置 val sparkConf = new SparkConf() .setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString()) .set("spark.yarn.am.memory", prop.getProperty("am.memory")) .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead")) .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead")) .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.reducer.maxSizeInFlight", "1m") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求
只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public 即可。
链接ZK
- 注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下
//zk val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer) val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
组装fromOffsets
- 组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费
var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset //支持多个topic : Set[String] topics.foreach(topicName => { //去brokers中获取partition数量,注意:新增partition后需要重启 val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName)) for (i <- 0 until children) { //kafka consumer 中是否有该partition的消费记录,如果没有设置为0 val tp = TopicAndPartition(topicName, i) val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i" if (zkClient.exists(path)) { fromOffsets += (tp -> zkClient.readData[String](path).toLong) } else { fromOffsets += (tp -> 0) } } })
通过createDirectStream接受数据
- 使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录
//创建Kafka持续读取流,通过zk中记录的offset val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
入库
-
入库HBase
//数据操作 messages.foreachRDD(rdd => { val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //data 处理 rdd.foreachPartition(partitionRecords => { //TaskContext 上下文 val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}") //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition) val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是 if (either.isLeft) { logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}") } partitionRecords.foreach(data => { HBaseDao.insert(data) }) }) })
-
插入数据到具体HBase数据库
/** * * 插入数据到 HBase * * 参数( tableName , json ) ): * * Json格式: * { * "rowKey": "00000-0", * "family:qualifier": "value", * "family:qualifier": "value", * ...... * } * * @param data * @return */ def insert(data: (String, String)): Boolean = { val t: HTable = getTable(data._1) //HTable try { val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2) val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey val put = new Put(rowKey) for ((k, v) <- map) { val keys: Array[String] = k.split(":") if (keys.length == 2){ put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes) } } Try(t.put(put)).getOrElse(t.close()) true } catch { case e: Exception => e.printStackTrace() false } }
运行并查看结果
-
运行命令:
/opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000
运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下篇博客
1.除非注明,博文均为原创,转载请标明地址: http://blog.xiaoxiaomo.com/2017/06/10/SparkStreaming-应用与实战-二/
2.文章作者:小小默
3.发布时间:2017年06月10日 - 16时56分
4.如果本文帮到了您,不妨点一下右下角的 分享到 按钮,您的鼓励是博主写作最大的动力。
以上所述就是小编给大家介绍的《Spark Streaming--应用与实战(二)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- go语言实战教程:Redis实战项目应用
- 单页应用的HATEOAS实战
- 单页应用的HATEOAS实战
- Vue应用框架整合与实战
- Spark综合使用及电商案例实战精析-Spark商业应用实战
- Spark Streaming--应用与实战(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Data Structures and Algorithms in Java
Michael T. Goodrich、Roberto Tamassia / Wiley / 2010-01-26 / USD 177.41
* This newest edition examines fundamental data structures by following a consistent object-oriented framework that builds intuition and analysis skills of data structures and algorithms * Presents ne......一起来看看 《Data Structures and Algorithms in Java》 这本书的介绍吧!