Kafka从上手到实践-实践真知:Kafka Java Consumer

栏目: 后端 · 发布时间: 5年前

内容简介:首先创建Consumer需要的配置信息,最基本的有五个信息:然后传入上面实例化好的配置信息,实例化Consumer:然后通过Consumer的

首先创建Consumer需要的配置信息,最基本的有五个信息:

  • Kafka集群的地址。
  • 发送的Message中Key的序列化方式。
  • 发送的Message中Value的序列化方式。
  • 指定Consumer Group。
  • 指定拉取Message范围的策略。
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
    

然后传入上面实例化好的配置信息,实例化Consumer:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

然后通过Consumer的 subscribe(Collection<String> topics) 方法订阅Topic:

consumer.subscribe(Arrays.asList("first_topic"));

最后获取Topic里的Message,将Message信息输出到日志中:

while(true) {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
	for(ConsumerRecord<String, String> record : records) {
		logger.info("Key: " + record.key() + ", Value: " + record.value());
		logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
	}
}

Consumer的 poll(Duration timeout) 方法可以设置获取数据的时间间隔,同时回忆一下在之前Consumer章节的 Consumer Poll Options 小节中,说过关于Consumer获取Message的四个配置项,都可以在Properties里进行设置。

启动Java Consumer后,在控制台可以看到如下信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-0 to offset 23.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-1 to offset 24.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-2 to offset 21.

在上面的信息中,可以看到 Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2] 这句话,说明当前这个Consumer会获取 first_topic 这个Topic中全部Partition中的Message。

如果我们再启动一个Consumer,这个Consumer和第一个在同一个组里,看看会有什么输出信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-2]

可以看到新启动的Consumer会输出 Setting newly assigned partitions [first_topic-2] 这句话,说明新的这个Consumer只会获取 first_topic 这个Topic的一个Partition中的Message。

再回去看看第一个Consumer的控制台:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1]

第一个Consumer新输出在控制台中的信息很关键,首先看到 Attempt to heartbeat failed since group is rebalancing 这句话,说明Kafka会自动重新给Consumer Group里的Consumer分配Topic的Partition。

再看 Setting newly assigned partitions [first_topic-0, first_topic-1] 这句,说明第一个Consumer不会再获取 first_topic-2 这个Partition里的Message了。这也印证了在Consumer章节的 Consumer Group 小节里讲过的概念。

Java Consumer with Assign and Seek

如果我们有一个临时的Consumer,不想加入任何一个Consumer Group,而且需要指定Topic的Partition,以及指定从哪个Message Offset开始获取数据,怎么办?所幸,Kafka提供了这样的API。

首先我们在实例化配置信息时,就不需要指定Consumer Group了:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, none

然后实例化 TopicPartition ,指定Topic和Partition序号。使用Consumer的 assign(Collection<TopicPartition> partitions) 方法,分配给该Consumer:

TopicPartition topicPartition = new TopicPartition("first_topic", 0);
consumer.assign(Arrays.asList(topicPartition));

再然后指定Message Offset:

long offset = 21L;
consumer.seek(topicPartition, offset);

运行该Consumer,可以看到如下输出信息:

[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Fetch offset 21 is out of range for partition first_topic-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Resetting offset for partition first_topic-0 to offset 22.
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Key: null, Value: hello world!
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Partition: 0, Offset: 22

如果我们使用Consumer Group CLI查看,会发现这种操作其实也是临时创建了一个Consumer Group:

root@iZ2ze2booskait1cxxyrljZ:~# kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

consumer_group_1
KMOffsetCache-iZ2ze2booskait1cxxyrljZ

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

商业的常识

商业的常识

申音 / 山西经济出版社 / 2011-7-1 / 35.00元

★为什么美国没有史玉柱,中国没有乔布斯? ★什么是“对的行业”、“错的行业”? ★我们需要什么样的营销? ★老板为什么要读商学院? ★山寨公司还需要管理吗? ★资源问题是个“伪问题”? ★别把商业模式当成葵花宝典 ★给海归技术创业兄弟的九个忠告 ★在一个不伟大的行业里,做一个伟大的公司 ★是什么让互联网遭遇了有史以来最鸡犬不宁的一战?一起来看看 《商业的常识》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换