Kafka从上手到实践-Kafka CLI:Consumer CLI

栏目: CSS · CSS3 · 发布时间: 5年前

内容简介:这一节来看看使用命令行启动Consumer接收消息,通过如下的命令启动Consumer:如上图所示,左边启动的是Consumer,右边启动的是Producer。Producer发送的消息可以实时的被Consumer接收到。但是有一个问题,那就是在上一节中,我们已经给

这一节来看看使用命令行启动Consumer接收消息,通过如下的命令启动Consumer:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
kafka-console-consumer.sh
--bootstrap-server
--topic

Kafka从上手到实践-Kafka CLI:Consumer CLI

如上图所示,左边启动的是Consumer,右边启动的是Producer。Producer发送的消息可以实时的被Consumer接收到。但是有一个问题,那就是在上一节中,我们已经给 first_topic 这个Topic发送了一些数据。但是现在Consumer启动后并没有收到。这是因为通过上面的命令启动的Consumer接收的是最新的消息,如果想接收所有的消息,还需要带一个参数:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

--from-beginning 表示启动的Consumer要接收所有的消息。

前文中说过,Consumer一般都是以组的形式存在,所以可以再加一个参数来创建一个Consumer Group:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group consumer_group_1

--group 可以指定Consumer Group的名称。

Kafka从上手到实践-Kafka CLI:Consumer CLI

如上图所示,左边启动了三个Consumer,这三个Consumer都在同一个名为 consumer_group_1 的组里。因为 first_topic 这个Topic有三个Partitions,所以当一个Consumer Group中有三个Consumer时,他们的收到的信息不会重复。

Kafka从上手到实践-Kafka CLI:Consumer CLI

又如上图所示,左边启动了三个Consumer,但是前两个在 consumer_group_1 的组里,最后一个在 consumer_group_2 的组里,所以前两个Consumer是以轮询的方式收到消息的,而最后一个Consumer可以收到全部的消息。

上面两个示例也充分证明了前文中所说的, 不同的Consumer Group可以消费同一个Topic中相同的Partition的消息,但是Consumer Group内的Consumer不能消费同一个Topic中相同的Partition的消息

上面的命令是显示的创建Consumer Group。上文中说到过,Kafka中,Consumer都是以组的形式连接Broker消费数据的。那么如果只有一个Consumer的情况下,是否有Consumer Group呢?其实,当只有一个Consumer时,也会自动创建一个Consumer Group。我们可以通过另外一组Consumer Group CLI来看一下:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

console-consumer-40439
console-consumer-81216
consumer_group_1c
console-consumer-14387
consumer_group_2
consumer_group_1
console-consumer-40563

可以看到,已经存在的Consumer Group中,除了我们之前创建的,还有以 console-consumer-xxxxx 这种命名格式存在的Consumer Group。这就是当我们只启动一个Consumer时Kafka自动为这个Consumer创建的Consumer Group。这里可以做个实验,先启动一个Consumer:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic

然后再来看看Consumer Group是否有增加:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

console-consumer-96752
console-consumer-40439
console-consumer-81216
consumer_group_1c
console-consumer-14387
consumer_group_2
consumer_group_1
console-consumer-40563

我们看到增加了一个Consumer Group console-consumer-96752

Consumer Group列表看完了,再来看看某一个Consumer Group的详细信息,比如查看 consumer_group_1 的详细信息。可以使用如下命令:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1

Consumer group 'consumer_group_1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first_topic     0          17              17              0               -               -               -
first_topic     2          17              17              0               -               -               -
first_topic     1          18              18              0               -               -

首先会告诉我们该Consumer Group中是否有正在活跃的Consumer,目前没有启动任何Consumer,所以提示我们 Consumer group 'consumer_group_1' has no active members.

然后会列出该Consumer Group消费的Topic、Partition情况、Offset情况、延迟(LAG)情况、处于活跃状态的Consumer信息。

可以看到 consumer_group_1 这个Consumer Group正在消费 first_topic 这个Topic中的Message,一共从三个Partition中消费了52条Messages,并且目前已经消费了全部的数据,因为每个Partition的延迟都是0,说明没有还未接收的Message。

现在我们再往 first_topic 中发送一条Message,再来看看情况如何:

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=1
>this is another message.

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1

Consumer group 'consumer_group_1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first_topic     0          17              17              0               -               -               -
first_topic     2          17              17              0               -               -               -
first_topic     1          18              19              1               -               -               -

可以看到Partition 1 的 LOG-END-OFFSET 是19,而 CURRENT-OFFSET 是18,并且Partition 1 的 LAG 是1,说明现在 first-topic 一共接收到了19条Message,而 consumer-group-1 只消费了18条,有1条延迟。

我们再启动 consumer_group_1 中的Consumer,然后再看看数据:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --group consumer_group_1

this is another message.

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
first_topic     0          17              17              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1
first_topic     1          19              19              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1
first_topic     2          17              17              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1

可以看到,目前有一个处于活跃的Consumer,并且Messages全部被消费。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Release It!

Release It!

Michael T. Nygard / Pragmatic Bookshelf / 2007-03-30 / USD 34.95

“Feature complete” is not the same as “production ready.” Whether it’s in Java, .NET, or Ruby on Rails, getting your application ready to ship is only half the battle. Did you design your system to......一起来看看 《Release It!》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试