内容简介:auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。
一,latest和earliest区别
1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。
二,创建topic
# bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank Created topic "tank". # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank Topic:tank PartitionCount:3 ReplicationFactor:2 Configs: Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
三,生产数据和接收生产数据
[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank >1 >2 >3 >4 >5 >6 。。。。。。。。。省略。。。。。。。。。 [root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning 1 2 3 4 5 6 。。。。。。。。省略。。。。。。。。
四,测试代码
object tank { def main(args: Array[String]): Unit = { val pros: Properties = new Properties pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092") /*分组由消费者决定,完全自定义,没有要求*/ pros.put("group.id", "tank") //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic pros.put("enable.auto.commit", "false") pros.put("auto.commit.interval.ms", "1000") pros.put("max.poll.records", "5") pros.put("session.timeout.ms", "30000") //只有当offset不存在的时候,才用latest或者earliest pros.put("auto.offset.reset", "latest") pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros) /*这里填写主题名称*/ consumer.subscribe(util.Arrays.asList("tank")) val system = akka.actor.ActorSystem("system") system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer)) } object tankTest { def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = { val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3)) if (!records.isEmpty) { for (record <- records) { if (record.value != null && !record.value.equals("")) { myLog.syncLog(record.value + "\t准备开启消费者出列数据", "kafka", "get") } } consumer.commitSync() } } } }
五,测试1,过程如下
1,查看offset
# bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe Error: Consumer group 'tank' does not exist.
在没有提交offset的情况,会报这个错误
2,latest模式运行,拉取不到数据
2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-1 to offset 11.
2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-0 to offset 11.
2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-2 to offset 11.
3,再用kafka-console-producer.sh生产数据,latest是可以拉到的,并且是拉取最新的数据(程序运行以后的数据),以前提交的数据是拉取不到的。
4,查看offset不报错了
# bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe Consumer group 'tank' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID tank 1 12 14 2 - - - tank 0 12 14 2 - - - tank 2 13 15 2 - - -
5,将auto.offset.reset设置成earliest,第一次生产的数据也取不到
在这里要注意: 如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以 。
六,测试2
1,重新创建topic,重复上面的第二,第三步
2,代码端先earliest,最早提交的数据是可以获取到的,再生产数据也是可以获取到的。
3,将auto.offset.reset设置成latest,再生产数据也是可以获取到的。
七,结论
虽然auto.offset.reset默认是latest,但是建议使用earliest。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Flutter 完整开发实战详解(十六、详解自定义布局实战)
- 数据结构 1 线性表详解 链表、 栈 、 队列 结合JAVA 详解
- 详解Openstack环境准备
- Java泛型详解
- iOS RunLoop 详解
- Raft协议详解
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Haskell函数式编程基础
Simon Thompson / 科学出版社 / 2013-7-1 / 129.00
《Haskell函数式编程基础(第3版)》是一本非常优秀的Haskell函数式程序设计的入门书,各章依次介绍函数式程序设计的基本概念、编译器和解释器、函数的各种定义方式、简单程序的构造、多态和高阶函数、诸如数组和列表的结构化数据、列表上的原始递归和推理、输入输出的控制处理、类型分类与检测方法、代数数据类型、抽象数据类型、惰性计算等内容。书中包含大量的实例和习题,注重程序测试、程序证明和问题求解,易......一起来看看 《Haskell函数式编程基础》 这本书的介绍吧!