内容简介:获取Kafka Consumer的offset
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。
Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。
第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:
import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import java.util.*;
...
try {
BlockingChannel channel = new BlockingChannel("localhost", 9092,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "demoGroup";
final String MY_CLIENTID = "demoClientId";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
} else {
// retry (after backoff)
}
}
catch (IOException e) {
// retry the query (after backoff)
}
...
第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:
// How to commit offsets
long now = System.currentTimeMillis();
Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now));
offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now));
OffsetCommitRequest commitRequest = new OffsetCommitRequest(
MY_GROUP,
offsets,
correlationId++,
MY_CLIENTID,
(short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
try {
channel.send(commitRequest.underlying());
OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
if (commitResponse.hasError()) {
for (partitionErrorCode: commitResponse.errors().values()) {
if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
// You must reduce the size of the metadata if you wish to retry
} else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
channel.disconnect();
// Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
} else {
// log and retry the commit
}
}
}
}
catch (IOException ioe) {
channel.disconnect();
// Go to step 1 and then retry the commit
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
}
}
catch (IOException e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
####
以上所述就是小编给大家介绍的《获取Kafka Consumer的offset》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ADO.NET获取数据(DataSet)同时获取表的架构实例
- 根据 PID 获取 K8S Pod名称 - 反之 POD名称 获取 PID
- .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
- phpinfo获取敏感内容
- 低开销获取时间戳
- python 利率获取
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Design systems
Not all design systems are equally effective. Some can generate coherent user experiences, others produce confusing patchwork designs. Some inspire teams to contribute to them, others are neglected. S......一起来看看 《Design systems》 这本书的介绍吧!