从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。


第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:

import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import java.util.*;
    try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                5000 /* read timeout in millis */);
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          5000 /* read timeout in millis */);
        } else {
            // retry (after backoff)
    catch (IOException e) {
        // retry the query (after backoff)

第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:

 // How to commit offsets
        long now = System.currentTimeMillis();
        Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now));
        offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now));
        OffsetCommitRequest commitRequest = new OffsetCommitRequest(
                (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
        try {
            OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
            if (commitResponse.hasError()) {
                for (partitionErrorCode: commitResponse.errors().values()) {
                    if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
                        // You must reduce the size of the metadata if you wish to retry
                    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                      // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
                    } else {
                        // log and retry the commit
        catch (IOException ioe) {
            // Go to step 1 and then retry the commit
 // How to fetch offsets
        List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
                (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
        try {
            OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
            OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
            short offsetFetchErrorCode = result.error();
            if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
                // Go to step 1 and retry the offset fetch
            } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
                // retry the offset fetch (after backoff)
            } else {
                long retrievedOffset = result.offset();
                String retrievedMetadata = result.metadata();
        catch (IOException e) {
            // Go to step 1 and then retry offset fetch after backoff


