Kafka从上手到实践-实践真知:Kafka Java Consumer

栏目: 后端 · 发布时间: 5年前

内容简介:首先创建Consumer需要的配置信息,最基本的有五个信息:然后传入上面实例化好的配置信息,实例化Consumer:然后通过Consumer的

首先创建Consumer需要的配置信息,最基本的有五个信息:

  • Kafka集群的地址。
  • 发送的Message中Key的序列化方式。
  • 发送的Message中Value的序列化方式。
  • 指定Consumer Group。
  • 指定拉取Message范围的策略。
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
    

然后传入上面实例化好的配置信息,实例化Consumer:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

然后通过Consumer的 subscribe(Collection<String> topics) 方法订阅Topic:

consumer.subscribe(Arrays.asList("first_topic"));

最后获取Topic里的Message,将Message信息输出到日志中:

while(true) {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
	for(ConsumerRecord<String, String> record : records) {
		logger.info("Key: " + record.key() + ", Value: " + record.value());
		logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
	}
}

Consumer的 poll(Duration timeout) 方法可以设置获取数据的时间间隔,同时回忆一下在之前Consumer章节的 Consumer Poll Options 小节中,说过关于Consumer获取Message的四个配置项,都可以在Properties里进行设置。

启动Java Consumer后,在控制台可以看到如下信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-0 to offset 23.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-1 to offset 24.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-2 to offset 21.

在上面的信息中,可以看到 Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2] 这句话,说明当前这个Consumer会获取 first_topic 这个Topic中全部Partition中的Message。

如果我们再启动一个Consumer,这个Consumer和第一个在同一个组里,看看会有什么输出信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-2]

可以看到新启动的Consumer会输出 Setting newly assigned partitions [first_topic-2] 这句话,说明新的这个Consumer只会获取 first_topic 这个Topic的一个Partition中的Message。

再回去看看第一个Consumer的控制台:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1]

第一个Consumer新输出在控制台中的信息很关键,首先看到 Attempt to heartbeat failed since group is rebalancing 这句话,说明Kafka会自动重新给Consumer Group里的Consumer分配Topic的Partition。

再看 Setting newly assigned partitions [first_topic-0, first_topic-1] 这句,说明第一个Consumer不会再获取 first_topic-2 这个Partition里的Message了。这也印证了在Consumer章节的 Consumer Group 小节里讲过的概念。

Java Consumer with Assign and Seek

如果我们有一个临时的Consumer,不想加入任何一个Consumer Group,而且需要指定Topic的Partition,以及指定从哪个Message Offset开始获取数据,怎么办?所幸,Kafka提供了这样的API。

首先我们在实例化配置信息时,就不需要指定Consumer Group了:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, none

然后实例化 TopicPartition ,指定Topic和Partition序号。使用Consumer的 assign(Collection<TopicPartition> partitions) 方法,分配给该Consumer:

TopicPartition topicPartition = new TopicPartition("first_topic", 0);
consumer.assign(Arrays.asList(topicPartition));

再然后指定Message Offset:

long offset = 21L;
consumer.seek(topicPartition, offset);

运行该Consumer,可以看到如下输出信息:

[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Fetch offset 21 is out of range for partition first_topic-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Resetting offset for partition first_topic-0 to offset 22.
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Key: null, Value: hello world!
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Partition: 0, Offset: 22

如果我们使用Consumer Group CLI查看,会发现这种操作其实也是临时创建了一个Consumer Group:

root@iZ2ze2booskait1cxxyrljZ:~# kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

consumer_group_1
KMOffsetCache-iZ2ze2booskait1cxxyrljZ

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

复盘+:把经验转化为能力(第2版)

复盘+:把经验转化为能力(第2版)

邱昭良 / 机械工业出版社 / 39.00

随着环境日趋多变、不确定、复杂、模糊,无论是个人还是组织,都需要更快更有效地进行创新应变、提升能力。复盘作为一种从经验中学习的结构化方法,满足了快速学习的需求,也是有效进行知识萃取与共享的机制。在第1版基础上,《复盘+:把经验转化为能力》(第2版)做了六方面修订: ·提炼复盘的关键词,让大家更精准地理解复盘的精髓; ·基于实际操作经验,梳理、明确了复盘的"底层逻辑"; ·明确了复......一起来看看 《复盘+:把经验转化为能力(第2版)》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试