内容简介:获取Kafka Consumer的offset
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。
Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。
第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:
import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import java.util.*;
...
try {
BlockingChannel channel = new BlockingChannel("localhost", 9092,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "demoGroup";
final String MY_CLIENTID = "demoClientId";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
} else {
// retry (after backoff)
}
}
catch (IOException e) {
// retry the query (after backoff)
}
...
第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:
// How to commit offsets
long now = System.currentTimeMillis();
Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now));
offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now));
OffsetCommitRequest commitRequest = new OffsetCommitRequest(
MY_GROUP,
offsets,
correlationId++,
MY_CLIENTID,
(short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
try {
channel.send(commitRequest.underlying());
OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
if (commitResponse.hasError()) {
for (partitionErrorCode: commitResponse.errors().values()) {
if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
// You must reduce the size of the metadata if you wish to retry
} else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
channel.disconnect();
// Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
} else {
// log and retry the commit
}
}
}
}
catch (IOException ioe) {
channel.disconnect();
// Go to step 1 and then retry the commit
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
}
}
catch (IOException e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
####
以上所述就是小编给大家介绍的《获取Kafka Consumer的offset》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ADO.NET获取数据(DataSet)同时获取表的架构实例
- 根据 PID 获取 K8S Pod名称 - 反之 POD名称 获取 PID
- .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
- phpinfo获取敏感内容
- 低开销获取时间戳
- python 利率获取
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
React Native开发指南
[美]艾森曼 / 黄为伟 / 人民邮电出版社 / 2016-6-1 / CNY 59.00
本书通过丰富的示例和详细的讲解,介绍了React Native这款JavaScript框架。在React Native中利用现有的JavaScript和React知识,就可以开发和部署功能完备的、真正原生的移动应用,并同时支持iOS与Android平台。除了框架本身的概念讲解之外,本书还讨论了如何使用第三方库,以及如何编写自己的Java或Objective-C的React Native扩展。一起来看看 《React Native开发指南》 这本书的介绍吧!