内容简介:Spark Streaming--应用与实战(二)
- 总体思路就是:
- put数据构造json数据,写入kafka;
- spark streaming任务启动后首先去zookeeper中去读取offset,组装成fromOffsets;
- spark streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
- 读取kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
- 写入数据到HBase
初始化与配置加载
- 下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
//接收参数 val Array(kafka_topic, timeWindow, maxRatePerPartition) = args //加载配置 val prop: Properties = new Properties() prop.load(this.getClass().getResourceAsStream("/kafka.properties")) val groupName = prop.getProperty("group.id") //获取配置文件中的topic val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic) if (kafkaTopics == null || kafkaTopics.length <= 0) { System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties") System.exit(1) } val topics: Set[String] = kafkaTopics.split(",").toSet val kafkaParams = scala.collection.immutable.Map[String, String]( "metadata.broker.list" -> prop.getProperty("bootstrap.servers"), "group.id" -> groupName, "auto.offset.reset" -> "largest") val kc = new KafkaCluster(kafkaParams) //初始化配置 val sparkConf = new SparkConf() .setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString()) .set("spark.yarn.am.memory", prop.getProperty("am.memory")) .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead")) .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead")) .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.reducer.maxSizeInFlight", "1m") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求
只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public 即可。
链接ZK
- 注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下
//zk val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer) val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
组装fromOffsets
- 组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费
var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset //支持多个topic : Set[String] topics.foreach(topicName => { //去brokers中获取partition数量,注意:新增partition后需要重启 val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName)) for (i <- 0 until children) { //kafka consumer 中是否有该partition的消费记录,如果没有设置为0 val tp = TopicAndPartition(topicName, i) val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i" if (zkClient.exists(path)) { fromOffsets += (tp -> zkClient.readData[String](path).toLong) } else { fromOffsets += (tp -> 0) } } })
通过createDirectStream接受数据
- 使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录
//创建Kafka持续读取流,通过zk中记录的offset val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
入库
-
入库HBase
//数据操作 messages.foreachRDD(rdd => { val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //data 处理 rdd.foreachPartition(partitionRecords => { //TaskContext 上下文 val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}") //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition) val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是 if (either.isLeft) { logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}") } partitionRecords.foreach(data => { HBaseDao.insert(data) }) }) })
-
插入数据到具体HBase数据库
/** * * 插入数据到 HBase * * 参数( tableName , json ) ): * * Json格式: * { * "rowKey": "00000-0", * "family:qualifier": "value", * "family:qualifier": "value", * ...... * } * * @param data * @return */ def insert(data: (String, String)): Boolean = { val t: HTable = getTable(data._1) //HTable try { val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2) val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey val put = new Put(rowKey) for ((k, v) <- map) { val keys: Array[String] = k.split(":") if (keys.length == 2){ put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes) } } Try(t.put(put)).getOrElse(t.close()) true } catch { case e: Exception => e.printStackTrace() false } }
运行并查看结果
-
运行命令:
/opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000
运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下篇博客
1.除非注明,博文均为原创,转载请标明地址: http://blog.xiaoxiaomo.com/2017/06/10/SparkStreaming-应用与实战-二/
2.文章作者:小小默
3.发布时间:2017年06月10日 - 16时56分
4.如果本文帮到了您,不妨点一下右下角的 分享到 按钮,您的鼓励是博主写作最大的动力。
以上所述就是小编给大家介绍的《Spark Streaming--应用与实战(二)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- go语言实战教程:Redis实战项目应用
- 单页应用的HATEOAS实战
- 单页应用的HATEOAS实战
- Vue应用框架整合与实战
- Spark综合使用及电商案例实战精析-Spark商业应用实战
- Spark Streaming--应用与实战(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Black Box Society
Frank Pasquale / Harvard University Press / 2015-1-5 / USD 35.00
Every day, corporations are connecting the dots about our personal behavior—silently scrutinizing clues left behind by our work habits and Internet use. The data compiled and portraits created are inc......一起来看看 《The Black Box Society》 这本书的介绍吧!