获取Kafka Consumer的offset

栏目: 服务器 · Apache · 发布时间: 6年前

内容简介:获取Kafka Consumer的offset

从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。

Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。

第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:

import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import java.util.*;
 
...
 
    try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
 
        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel.disconnect();
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          5000 /* read timeout in millis */);
            channel.connect();
        } else {
            // retry (after backoff)
        }
    }
    catch (IOException e) {
        // retry the query (after backoff)
    }
    
...

第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:

 // How to commit offsets
 
        long now = System.currentTimeMillis();
        Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now));
        offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now));
        OffsetCommitRequest commitRequest = new OffsetCommitRequest(
                MY_GROUP,
                offsets,
                correlationId++,
                MY_CLIENTID,
                (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
        try {
            channel.send(commitRequest.underlying());
            OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
            if (commitResponse.hasError()) {
                for (partitionErrorCode: commitResponse.errors().values()) {
                    if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
                        // You must reduce the size of the metadata if you wish to retry
                    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                      channel.disconnect();
                      // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
                    } else {
                        // log and retry the commit
                    }
                }
            }
        }
        catch (IOException ioe) {
            channel.disconnect();
            // Go to step 1 and then retry the commit
        }
 
 // How to fetch offsets
 
        List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
        partitions.add(testPartition0);
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
                MY_GROUP,
                partitions,
                (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
                correlationId,
                MY_CLIENTID);
        try {
            channel.send(fetchRequest.underlying());
            OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
            OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
            short offsetFetchErrorCode = result.error();
            if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
                channel.disconnect();
                // Go to step 1 and retry the offset fetch
            } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
                // retry the offset fetch (after backoff)
            } else {
                long retrievedOffset = result.offset();
                String retrievedMetadata = result.metadata();
            }
        }
        catch (IOException e) {
            channel.disconnect();
            // Go to step 1 and then retry offset fetch after backoff
        }

####


以上所述就是小编给大家介绍的《获取Kafka Consumer的offset》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Programming Ruby

Programming Ruby

Dave Thomas、Chad Fowler、Andy Hunt / Pragmatic Bookshelf / 2004-10-8 / USD 44.95

Ruby is an increasingly popular, fully object-oriented dynamic programming language, hailed by many practitioners as the finest and most useful language available today. When Ruby first burst onto the......一起来看看 《Programming Ruby》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具