内容简介:获取Kafka Consumer的offset
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。
Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。
第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:
import kafka.api.*; import kafka.cluster.Broker; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadataAndError; import kafka.common.TopicAndPartition; import kafka.javaapi.ConsumerMetadataResponse; import kafka.javaapi.OffsetCommitRequest; import kafka.javaapi.OffsetCommitResponse; import kafka.javaapi.OffsetFetchRequest; import kafka.javaapi.OffsetFetchResponse; import kafka.network.BlockingChannel; import java.util.*; ... try { BlockingChannel channel = new BlockingChannel("localhost", 9092, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); final String MY_GROUP = "demoGroup"; final String MY_CLIENTID = "demoClientId"; int correlationId = 0; final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0); final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1); channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID)); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer()); if (metadataResponse.errorCode() == ErrorMapping.NoError()) { Broker offsetManager = metadataResponse.coordinator(); // if the coordinator is different, from the above channel's host then reconnect channel.disconnect(); channel = new BlockingChannel(offsetManager.host(), offsetManager.port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); } else { // retry (after backoff) } } catch (IOException e) { // retry the query (after backoff) } ...
第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:
// How to commit offsets long now = System.currentTimeMillis(); Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now)); offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now)); OffsetCommitRequest commitRequest = new OffsetCommitRequest( MY_GROUP, offsets, correlationId++, MY_CLIENTID, (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper try { channel.send(commitRequest.underlying()); OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer()); if (commitResponse.hasError()) { for (partitionErrorCode: commitResponse.errors().values()) { if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) { // You must reduce the size of the metadata if you wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { channel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager } else { // log and retry the commit } } } } catch (IOException ioe) { channel.disconnect(); // Go to step 1 and then retry the commit } // How to fetch offsets List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>(); partitions.add(testPartition0); OffsetFetchRequest fetchRequest = new OffsetFetchRequest( MY_GROUP, partitions, (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper correlationId, MY_CLIENTID); try { channel.send(fetchRequest.underlying()); OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer()); OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0); short offsetFetchErrorCode = result.error(); if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { channel.disconnect(); // Go to step 1 and retry the offset fetch } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) { // retry the offset fetch (after backoff) } else { long retrievedOffset = result.offset(); String retrievedMetadata = result.metadata(); } } catch (IOException e) { channel.disconnect(); // Go to step 1 and then retry offset fetch after backoff }
####
以上所述就是小编给大家介绍的《获取Kafka Consumer的offset》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ADO.NET获取数据(DataSet)同时获取表的架构实例
- 根据 PID 获取 K8S Pod名称 - 反之 POD名称 获取 PID
- .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
- phpinfo获取敏感内容
- 低开销获取时间戳
- python 利率获取
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
数据库系统概念
Abraham Silberschatz、Henry F. Korth、S. Sudarshan / 杨冬青、马秀莉、唐世渭 / 机械工业 / 2006-10-01 / 69.50元
本书是数据库系统方面的经典教材之一。国际上许多著名大学包括斯坦福大学、耶鲁大学、得克萨斯大学、康奈尔大学、伊利诺伊大学、印度理工学院等都采用本书作为教科书。我国也有许多所大学采用本书以前版本的中文版作为本科生和研究生的数据库课程的教材和主要教学参考书,收到了良好的效果。 本书调整和新增内容:调整了第4版的讲授顺序。首先介绍SQL及其高级特性,使学生容易接受数据库设计的概念。新增数据库设计的专......一起来看看 《数据库系统概念》 这本书的介绍吧!