kafka auto.offset.reset latest earliest 详解

栏目: 后端 · 发布时间: 6年前

内容简介:auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。

一,latest和earliest区别

1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

二,创建topic

# bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank
Created topic "tank".

# bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank
Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:
 Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
 Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
 Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1

三,生产数据和接收生产数据

[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank
>1
>2
>3
>4
>5
>6
。。。。。。。。。省略。。。。。。。。。
[root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning
1
2
3
4
5
6
。。。。。。。。省略。。。。。。。。

四,测试代码

object tank {
    def main(args: Array[String]): Unit = {
        val pros: Properties = new Properties
        pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")
        /*分组由消费者决定,完全自定义,没有要求*/
        pros.put("group.id", "tank")
        //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic
        pros.put("enable.auto.commit", "false")
        pros.put("auto.commit.interval.ms", "1000")
        pros.put("max.poll.records", "5")
        pros.put("session.timeout.ms", "30000")
        //只有当offset不存在的时候,才用latest或者earliest
        pros.put("auto.offset.reset", "latest")

        pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

        val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)

        /*这里填写主题名称*/
        consumer.subscribe(util.Arrays.asList("tank"))

        val system = akka.actor.ActorSystem("system")
        system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))

    }

    object tankTest {
        def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {
            val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))
            if (!records.isEmpty) {
                for (record <- records) {
                    if (record.value != null && !record.value.equals("")) {
                        myLog.syncLog(record.value + "\t准备开启消费者出列数据", "kafka", "get")
                    }
                }
                consumer.commitSync()

            }

        }
    }
}

五,测试1,过程如下

1,查看offset

# bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe
Error: Consumer group 'tank' does not exist.

在没有提交offset的情况,会报这个错误

2,latest模式运行,拉取不到数据

2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-1 to offset 11.

2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-0 to offset 11.

2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-2 to offset 11.

3,再用kafka-console-producer.sh生产数据,latest是可以拉到的,并且是拉取最新的数据(程序运行以后的数据),以前提交的数据是拉取不到的。

4,查看offset不报错了

# bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe
Consumer group 'tank' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
tank            1          12              14              2               -               -               -
tank            0          12              14              2               -               -               -
tank            2          13              15              2               -               -               -

5,将auto.offset.reset设置成earliest,第一次生产的数据也取不到

在这里要注意: 如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以

六,测试2

1,重新创建topic,重复上面的第二,第三步

2,代码端先earliest,最早提交的数据是可以获取到的,再生产数据也是可以获取到的。

3,将auto.offset.reset设置成latest,再生产数据也是可以获取到的。

七,结论

虽然auto.offset.reset默认是latest,但是建议使用earliest。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

More Eric Meyer on CSS (Voices That Matter)

More Eric Meyer on CSS (Voices That Matter)

Eric A. Meyer / New Riders Press / 2004-04-08 / USD 45.00

Ready to commit to using more CSS on your sites? If you are a hands-on learner who has been toying with CSS and want to experiment with real-world projects that will enable you to see how CSS......一起来看看 《More Eric Meyer on CSS (Voices That Matter)》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具