librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

栏目: IT技术 · 发布时间: 5年前

内容简介:librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

 本文主要介绍使用kafka C语言接口库,如何设置消费者订阅消息的起始偏移。

01

缺省配置

默认情况下,Kafka消费者从最后一次提交的偏移量位置(offset)开始消费消息,如果Topic+Partition和Group之前没有提交过偏移量,它订阅消息开始位置取决于Topic的配置属性auto.offset.reset的设置。默认为最新(latest),也就是在分区末尾开始消耗(仅消费新消息)。相关配置可以参考官方文档:https://kafka.apache.org/documentation/#topicconfigs

方便查阅,截个图:

librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

02


相关接口信息

librdkafka提供了assign() API,通过设置rd_kafka_topic_partition_t的.offset属性,你可以指定每一个Partition的起始偏移。偏移量可以是一个绝对的偏移(>0),或逻辑偏移 (BEGINNING, END, STORED, TAIL(…))。

rdkafka.h头文件中定义了Partition的管理结构rd_kafka_topic_partition_t,包含offset信息;同时提供了逻辑偏移的定义RD_KAFKA_OFFSET_XXX。

/** * @brief Generic place holder for a specific Topic+Partition. * * @sa rd_kafka_topic_partition_list_new() */typedef struct rd_kafka_topic_partition_s {        char        *topic;             /**< Topic name */        int32_t      partition;         /**< Partition */      int64_t      offset;            /**< Offset */        void        *metadata;          /**< Metadata */        size_t       metadata_size;     /**< Metadata size */        void        *opaque;            /**< Application opaque */        rd_kafka_resp_err_t err;        /**< Error code, depending on use. */        void       *_private;           /**< INTERNAL USE ONLY,                                         *   INITIALIZE TO ZERO, DO NOT TOUCH */} rd_kafka_topic_partition_t;
////////////////////////////////////////////////////////////#define RD_KAFKA_OFFSET_BEGINNING -2 /**< Start consuming from beginning of * kafka partition queue: oldest msg */#define RD_KAFKA_OFFSET_END -1 /**< Start consuming from end of kafka * partition queue: next msg */#define RD_KAFKA_OFFSET_STORED -1000 /**< Start consuming from offset retrieved * from offset store */#define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */

/** @cond NO_DOC */#define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use *//** @endcond */
/** * @brief Start consuming \p CNT messages from topic's current end offset. * * That is, if current end offset is 12345 and \p CNT is 200, it will start * consuming from offset \c 12345-200 = \c 12145. */#define RD_KAFKA_OFFSET_TAIL(CNT)  (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))

通过rd_kafka_assign()函数接口可以配置需要消费的Partition信息。

/** * @brief Atomic assignment of partitions to consume. * * The new \p partitions will replace the existing assignment. * * When used from a rebalance callback the application shall pass the * partition list passed to the callback (or a copy of it) (even if the list * is empty) rather than NULL to maintain internal join state.
* A zero-length \p partitions will treat the partitions as a valid, * albeit empty, assignment, and maintain internal state, while a \c NULL * value for \p partitions will reset and clear the internal state. */RD_EXPORT rd_kafka_resp_err_trd_kafka_assign (rd_kafka_t *rk,                 const rd_kafka_topic_partition_list_t *partitions);

03


如何配置offset

对于消费者来说,有两个场景来修改订阅的Parttion offset信息:一是系统初始化时直接指定offset信息,二是消费者群组重平衡(rebalance)的回调函数。接下来分别介绍一下。

系统初始化时,指定offset示例:

rd_kafka_topic_partition_list_t *partitions;partitions = rd_kafka_topic_partition_list_new(0);rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234;rd_kafka_assign(rk, partitions);rd_kafka_topic_partition_list_destroy(partitions);  

rebalance_cb()回调函数中,指定offset示例。

void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,                      rd_kafka_topic_partition_list_t *partitions, void *opaque) {   if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {       rd_kafka_topic_partition_t *part;       if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3)))           part->offset = 1234;       rd_kafka_assign(rk, partitions);   }  else {       rd_kafka_assign(rk, NULL);   }}

04


更多精彩文章请订阅公众号

librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Machine Learning in Action

Machine Learning in Action

Peter Harrington / Manning Publications / 2012-4-19 / GBP 29.99

It's been said that data is the new "dirt"—the raw material from which and on which you build the structures of the modern world. And like dirt, data can seem like a limitless, undifferentiated mass. ......一起来看看 《Machine Learning in Action》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换