Kafka从上手到实践-Kafka CLI:Reseting Offset & Config CLI

栏目: 后端 · 发布时间: 5年前

内容简介:在实际的业务场景中,经常需要重复消费Topic中的Message,所以来看看如何重置Offset。首先重置Offset可以通过如下的命令:Kafka为我们提供了6种重置Offset的方式,也就是命令中的

在实际的业务场景中,经常需要重复消费Topic中的Message,所以来看看如何重置Offset。

首先重置Offset可以通过如下的命令:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --reset-offsets [options] --execute --topic xxxx

Kafka为我们提供了6种重置Offset的方式,也就是命令中的 options

  • --to-earliest :重置到最早的Offset。
  • --to-latest :重置到最后的Offset。
  • --to-offset <Long: offset> :重置到指定的Offset。
  • --to-current :重置到当前的Offset。
  • --to-datetime <String: datetime> :重置到指定时间的Offset,时间格式为 YYYY-MM-DDTHH:mm:SS.sss
  • --shift-by <Long: number-of-offsets> :左移或右移Offset。

举个例子来看看:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --topic first_topic --reset-offsets --shift-by -2 --execute

TOPIC                          PARTITION  NEW-OFFSET
first_topic                    2          15
first_topic                    1          17
first_topic                    0          15

上面的命令将 consumer_group_1 消费 first_topic 的三个Partitions的Offset向左移了2位。如此之后,相当于 consumer_group_1 还有6条Message没有消费。我们启动Consumer看一下:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --topic first_topic

C
F
E
this is another message.
A
D

可以看到启动Consumer后,消费了6条Message。其他的Reset Options用法是一样的。这使得我们可以非常灵活的控制Consumer消费Message。

Config CLI

我们再来看看如何通过命令进行Kafka的配置。用到的命令是 kafka-config.sh ,该命令可以对Topic、Broker、Client进行配置。关键的属性有以下三个:

  • --entity-type :这个属性设置要对什么进行配置,可选值为 topicsbrokersclientsusers
  • --entity-name :这个属性设置对应Type的名称,比如Topic名称、Broker Id、Client Id、User name。
  • --alter :确认修改。

首先我们创建一个Topic:

kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic configured-topic --partitions 3 --replication-factor 1

看看新创建的Topic的信息:

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic configured-topic --describe

Topic:configured-topic	PartitionCount:3	ReplicationFactor:1	Configs:
Topic: configured-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic: configured-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
Topic: configured-topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

可以看到打印信息中的Configs是空的,说明这个Topic没有做额外的配置。或者也可以使用如下命令查看Topic的配置:

kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --describe

Configs for topic 'configured-topic' are

看到打印信息只有 Configs for topic 'configured-topic' are ,同样说明该Topic还没有额外配置信息。

接下来该这个Topic设置 min.insync.replicas 属性:

kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --add-config min.insync.replicas=2 --alter

Completed Updating config for entity: topic 'configured-topic'.

再来查看一下:

kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --describe

Configs for topic 'configured-topic' are min.insync.replicas=2

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic configured-topic --describe

Topic:configured-topic	PartitionCount:3	ReplicationFactor:1	Configs:min.insync.replicas=2
Topic: configured-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic: configured-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
Topic: configured-topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

两种方式都可以看到刚才更新的配置信息。

--add-config 换成 --delete-config 就可以删除配置项:

kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --add-config min.insync.replicas=2 --alter

Completed Updating config for entity: topic 'configured-topic'.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

用UML构建Web应用

用UML构建Web应用

科纳尔伦 (Conallen Jim) / 陈起 / 中国电力出版社 / 2003-11 / 39.0

用UML构建Web应用(第2版),ISBN:9787508315577,作者:(美)Jim Conallen著;陈起,英宇译;陈起译一起来看看 《用UML构建Web应用》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换