获取Kafka Consumer的offset
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。
第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:
import kafka.api.*; import kafka.cluster.Broker; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadataAndError; import kafka.common.TopicAndPartition; import kafka.javaapi.ConsumerMetadataResponse; import kafka.javaapi.OffsetCommitRequest; import kafka.javaapi.OffsetCommitResponse; import kafka.javaapi.OffsetFetchRequest; import kafka.javaapi.OffsetFetchResponse; import kafka.network.BlockingChannel; import java.util.*; ... try { BlockingChannel channel = new BlockingChannel("localhost", 9092, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); final String MY_GROUP = "demoGroup"; final String MY_CLIENTID = "demoClientId"; int correlationId = 0; final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0); final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1); channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID)); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer()); if (metadataResponse.errorCode() == ErrorMapping.NoError()) { Broker offsetManager = metadataResponse.coordinator(); // if the coordinator is different, from the above channel's host then reconnect channel.disconnect(); channel = new BlockingChannel(offsetManager.host(), offsetManager.port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); } else { // retry (after backoff) } } catch (IOException e) { // retry the query (after backoff) } ...
第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:
// How to commit offsets long now = System.currentTimeMillis(); Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now)); offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now)); OffsetCommitRequest commitRequest = new OffsetCommitRequest( MY_GROUP, offsets, correlationId++, MY_CLIENTID, (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper try { channel.send(commitRequest.underlying()); OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer()); if (commitResponse.hasError()) { for (partitionErrorCode: commitResponse.errors().values()) { if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) { // You must reduce the size of the metadata if you wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { channel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager } else { // log and retry the commit } } } } catch (IOException ioe) { channel.disconnect(); // Go to step 1 and then retry the commit } // How to fetch offsets List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>(); partitions.add(testPartition0); OffsetFetchRequest fetchRequest = new OffsetFetchRequest( MY_GROUP, partitions, (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper correlationId, MY_CLIENTID); try { channel.send(fetchRequest.underlying()); OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer()); OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0); short offsetFetchErrorCode = result.error(); if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { channel.disconnect(); // Go to step 1 and retry the offset fetch } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) { // retry the offset fetch (after backoff) } else { long retrievedOffset = result.offset(); String retrievedMetadata = result.metadata(); } } catch (IOException e) { channel.disconnect(); // Go to step 1 and then retry offset fetch after backoff }
