内容简介:本文主要介绍 Spark Streaming 应用开发中消费 Kafka 消息的相关内容,文章着重突出了开发环境的配置以及手动管理 Kafka 偏移量的实现。在 pom.xml 的 build 节点下的 plugins 中添加 scala 编译插件Maven 打包语句:
本文主要介绍 Spark Streaming 应用开发中消费 Kafka 消息的相关内容,文章着重突出了开发环境的配置以及手动管理 Kafka 偏移量的实现。
一、开发环境
1、组件版本
- CDH 集群版本:6.0.1
- Spark 版本:2.2.0
- Kafka 版本:1.0.1
2、Maven 依赖
<!-- scala --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <!-- spark 基础依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- spark-streaming 相关依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- spark-streaming-kafka 相关依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- zookeeper 相关依赖 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5-cdh6.0.1</version> </dependency> 复制代码
3、scala 编译
在 pom.xml 的 build 节点下的 plugins 中添加 scala 编译插件
<plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> 复制代码
Maven 打包语句: mvn clean scala:compile compile package
4、打包注意事项
由于 spark、spark-streaming、zookeeper 等均为大数据集群中必备的组件,因此与之相关的依赖无需打包到最终的 jar 包中,可以将其 scope 设置为 provided 即可;否则最终的 jar 包会相当庞大。
二、Kafka 偏移量
1、偏移量(offset)
这里的偏移量是指 kafka consumer offset,在 Kafka 0.9 版本之前消费者偏移量默认被保存在 zookeeper 中( /consumers/<group.id>/offsets/<topic>/<partitionId>
),因此在初始化消费者的时候需要指定 zookeeper.hosts
。
随着 Kafka consumer 在实际场景的不断应用,社区发现旧版本 consumer 把位移提交到 ZooKeeper 的做法并不合适。ZooKeeper 本质上只是一个协调服务组件,它并不适合作为位移信息的存储组件,毕竟频繁高并发的读/写操作并不是 ZooKeeper 擅长的事情。因此在 0.9 版本开始 consumer 将位移提交到 Kafka 的一个内部 topic( __consumer_offsets
)中,该主题默认有 50 个分区,每个分区 3 个副本。
2、消息交付语义
- at-most-once :最多一次,消息可能丢失,但不会被重复处理;
- at-least-once :至少一次,消息不会丢失,但可能被处理多次;
- exactly-once :精确一次,消息一定会被处理且只会被处理一次。
若 consumer 在消息消费之前就提交位移,那么便可以实现 at-most-once,因为若 consumer 在提交位移与消息消费之间崩溃,则 consumer 重启后会从新的 offset 位置开始消费,前面的那条消息就丢失了;相反地,若提交位移在消息消费之后,则可实现 at-least-once 语义。由于 Kafka 没有办法保证这两步操作可以在同一个事务中完成,因此 Kafka 默认提供的就是 at-least-once 的处理语义。
3、offset 提交方式
默认情况下,consumer 是自动提交位移的,自动提交间隔是 5 秒,可以通过设置 auto.commit.interval.ms
参数可以控制自动提交的间隔。自动位移提交的优势是降低了用户的开发成本使得用户不必亲自处理位移提交;劣势是用户不能细粒度地处理位移的提交,特别是在有较强的精确一次处理语义时(在这种情况下,用户可以使用手动位移提交)。
所谓的手动位移提交就是用户自行确定消息何时被真正处理完并可以提交位移,用户可以确保只有消息被真正处理完成后再提交位移。如果使用自动位移提交则无法保证这种时序性,因此在这种情况下必须使用手动提交位移。设置使用手动提交位移非常简单,仅仅需要在构建 KafkaConsumer 时设置 enable.auto.commit=false
,然后调用 commitSync 或 commitAsync 方法即可。
三、使用 Zookeeper 管理 Kafka 偏移量
1、Zookeeper 管理偏移量的优势
虽然说新版 kafka 中已经无需使用 zookeeper 管理偏移量了,但是使用 zookeeper 管理偏移量相比 kafka 自行管理偏移量有如下几点好处:
- 可以使用 zookeeper 管理 工具 轻松查看 offset 信息;
- 无需修改 groupId 即可从头读取消息;
- 特别情况下可以人为修改 offset 信息。
借助 zookeeper 管理工具可以对任何一个节点的信息进行修改、删除,如果希望从最开始读取消息,则只需要删除 zk 某个节点的数据即可。
2、Zookeeper 偏移量管理实现
import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka010.OffsetRange import scala.collection.JavaConverters._ class ZkKafkaOffset(getClient: () => ZkClient, getZkRoot : () => String) { // 定义为 lazy 实现了懒汉式的单例模式,解决了序列化问题,方便使用 broadcast lazy val zkClient: ZkClient = getClient() lazy val zkRoot: String = getZkRoot() // offsetId = md5(groupId+join(topics)) // 初始化偏移量的 zk 存储路径 zkRoot def initOffset(offsetId: String) : Unit = { if(!zkClient.exists(zkRoot)){ zkClient.createPersistent(zkRoot, true) } } // 从 zkRoot 读取偏移量信息 def getOffset(): Map[TopicPartition, Long] = { val keys = zkClient.getChildren(zkRoot) var initOffsetMap: Map[TopicPartition, Long] = Map() if(!keys.isEmpty){ for (k:String <- keys.asScala) { val ks = k.split("!") val value:Long = zkClient.readData(zkRoot + "/" + k) initOffsetMap += (new TopicPartition(ks(0), Integer.parseInt(ks(1))) -> value) } } initOffsetMap } // 根据单条消息,更新偏移量信息 def updateOffset(consumeRecord: ConsumerRecord[String, String]): Boolean = { val path = zkRoot + "/" + consumeRecord.topic + "!" + consumeRecord.partition zkClient.writeData(path, consumeRecord.offset()) true } // 消费消息前,批量更新偏移量信息 def updateOffset(offsetRanges: Array[OffsetRange]): Boolean = { for (offset: OffsetRange <- offsetRanges) { val path = zkRoot + "/" + offset.topic + "!" + offset.partition if(!zkClient.exists(path)){ zkClient.createPersistent(path, offset.fromOffset) } else{ zkClient.writeData(path, offset.fromOffset) } } true } // 消费消息后,批量提交偏移量信息 def commitOffset(offsetRanges: Array[OffsetRange]): Boolean = { for (offset: OffsetRange <- offsetRanges) { val path = zkRoot + "/" + offset.topic + "!" + offset.partition if(!zkClient.exists(path)){ zkClient.createPersistent(path, offset.untilOffset) } else{ zkClient.writeData(path, offset.untilOffset) } } true } def finalize(): Unit = { zkClient.close() } } object ZkKafkaOffset{ def apply(cong: SparkConf, offsetId: String): ZkKafkaOffset = { val getClient = () =>{ val zkHost = cong.get("kafka.zk.hosts", "127.0.0.1:2181") new ZkClient(zkHost, 30000) } val getZkRoot = () =>{ val zkRoot = "/kafka/ss/offset/" + offsetId zkRoot } new ZkKafkaOffset(getClient, getZkRoot) } } 复制代码
3、Spark Streaming 消费 Kafka 消息
import scala.collection.JavaConverters._ object RtDataLoader { def main(args: Array[String]): Unit = { // 从配置文件读取 kafka 配置信息 val props = new Props("xxx.properties") val groupId = props.getStr("groupId", "") if(StrUtil.isBlank(groupId)){ StaticLog.error("groupId is empty") return } val kfkServers = props.getStr("kfk_servers") if(StrUtil.isBlank(kfkServers)){ StaticLog.error("bootstrap.servers is empty") return } val topicStr = props.getStr("topics") if(StrUtil.isBlank(kfkServers)){ StaticLog.error("topics is empty") return } // KAFKA 配置设定 val topics = topicStr.split(",") val kafkaConf = Map[String, Object]( "bootstrap.servers" -> kfkServers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "receive.buffer.bytes" -> (102400: java.lang.Integer), "max.partition.fetch.bytes" -> (5252880: java.lang.Integer), "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val conf = new SparkConf().setAppName("ss-kafka").setIfMissing("spark.master", "local[2]") // streaming 相关配置 conf.set("spark.streaming.stopGracefullyOnShutdown","true") conf.set("spark.streaming.backpressure.enabled","true") conf.set("spark.streaming.backpressure.initialRate","1000") // 设置 zookeeper 连接信息 conf.set("kafka.zk.hosts", props.getStr("zk_hosts", "sky-01:2181")) // 创建 StreamingContext val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(5)) // 根据 groupId 和 topics 获取 offset val offsetId = SecureUtil.md5(groupId + topics.mkString(",")) val kafkaOffset = ZkKafkaOffset(ssc.sparkContext.getConf, offsetId) kafkaOffset.initOffset(ssc, offsetId) val customOffset: Map[TopicPartition, Long] = kafkaOffset.getOffset(ssc) // 创建数据流 var stream:InputDStream[ConsumerRecord[String, String]] = null if(topicStr.contains("*")) { StaticLog.warn("使用正则匹配读取 kafka 主题:" + topicStr) stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset)) } else { StaticLog.warn("待读取的 kafka 主题:" + topicStr) stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset)) } // 消费数据 stream.foreachRDD(rdd => { // 消息消费前,更新 offset 信息 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaOffset.updateOffset(offsetRanges) //region 处理详情数据 StaticLog.info("开始处理 RDD 数据!") //endregion // 消息消费结束,提交 offset 信息 kafkaOffset.commitOffset(offsetRanges) }) ssc.start() ssc.awaitTermination() } } 复制代码
4、注意事项
auto.offset.reset
对于 auto.offset.reset
个人推荐设置为 earliest,初次运行的时候,由于 __consumer_offsets
没有相关偏移量信息,因此消息会从最开始的地方读取;当第二次运行时,由于 __consumer_offsets
已经存在消费的 offset 信息,因此会根据 __consumer_offsets
中记录的偏移信息继续读取数据。
此外,对于使用 zookeeper 管理偏移量而言,只需要删除对应的节点,数据即可从头读取,也是非常方便。不过如果你希望从最新的地方读取数据,不需要读取旧消息,则可以设置为 latest。
基于正则订阅 Kafka 主题
基于正则订阅主题,有以下好处:
- 无需罗列主题名,一两个主题还好,如果有几十个,罗列过于麻烦了;
- 可实现动态订阅的效果(新增的符合正则的主题也会被读取)。
stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset)) 复制代码
SparkStreaming 序列化问题
开发 SparkStreaming 程序的每个人都会遇到各种各样的序列化问题,简单来说:在 driver 中使用到的变量或者对象无需序列化,传递到 exector 中的变量或者对象需要序列化。因此推荐的做法是,在 exector 中最好只处理数据的转换,在 driver 中对处理的结果进行存储等操作。
stream.foreachRDD(rdd => { // driver 代码运行区域 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaOffset.updateOffset(offsetRanges) // exector 代码运行区域 val resultRDD = rdd.map(xxxxxxxx) //endregion //对结果进行存储 resultRDD.saveToES(xxxxxx) kafkaOffset.commitOffset(offsetRanges) }) 复制代码
文中部分概念摘自《Kafka 实战》,一本非常棒的书籍,推荐一下。
Any Code,Code Any!
扫码关注『AnyCode』,编程路上,一起前行。
以上所述就是小编给大家介绍的《Spark Streaming 之 Kafka 偏移量管理》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Kafka 偏移量管理实现精确一次语义在Spark&Flink中的技术实践-kafka商业应用实战
- 记一次Access偏移注入
- ios – reloadRowsAtIndexPaths时保持偏移量
- Kafka 消息偏移量的维护
- php-rdkafka手动提交偏移量
- 人品爆发:偏移注入与移位溢注的联合使用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)
左程云 / 电子工业出版社 / 109.00元
《程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)》是一本程序员代码面试"神书”!书中对IT名企代码面试各类题目的最优解进行了总结,并提供了相关代码实现。针对当前程序员面试缺乏权威题目汇总这一痛点,本书选取将近300道真实出现过的经典代码面试题,帮助广大程序员的面试准备做到接近万无一失。"刷”完本书后,你就是"题王”!《程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)》......一起来看看 《程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)》 这本书的介绍吧!