内容简介:librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置
“ 本文主要介绍使用kafka C语言接口库,如何设置消费者订阅消息的起始偏移。”
01
—
缺省配置
默认情况下,Kafka消费者从最后一次提交的偏移量位置(offset)开始消费消息,如果Topic+Partition和Group之前没有提交过偏移量,它订阅消息开始位置取决于Topic的配置属性auto.offset.reset的设置。默认为最新(latest),也就是在分区末尾开始消耗(仅消费新消息)。相关配置可以参考官方文档:https://kafka.apache.org/documentation/#topicconfigs
方便查阅,截个图:
02
—
相关接口信息
librdkafka提供了assign() API,通过设置rd_kafka_topic_partition_t的.offset属性,你可以指定每一个Partition的起始偏移。偏移量可以是一个绝对的偏移(>0),或逻辑偏移 (BEGINNING, END, STORED, TAIL(…))。
rdkafka.h头文件中定义了Partition的管理结构rd_kafka_topic_partition_t,包含offset信息;同时提供了逻辑偏移的定义RD_KAFKA_OFFSET_XXX。
/*** @brief Generic place holder for a specific Topic+Partition.** @sa rd_kafka_topic_partition_list_new()*/typedef struct rd_kafka_topic_partition_s {char *topic; /**< Topic name */int32_t partition; /**< Partition */int64_t offset; /**< Offset */void *metadata; /**< Metadata */size_t metadata_size; /**< Metadata size */void *opaque; /**< Application opaque */rd_kafka_resp_err_t err; /**< Error code, depending on use. */void *_private; /**< INTERNAL USE ONLY,* INITIALIZE TO ZERO, DO NOT TOUCH */} rd_kafka_topic_partition_t;////////////////////////////////////////////////////////////* kafka partition queue: oldest msg */* partition queue: next msg */* from offset store *//** @cond NO_DOC *//** @endcond *//*** @brief Start consuming \p CNT messages from topic's current end offset.** That is, if current end offset is 12345 and \p CNT is 200, it will start* consuming from offset \c 12345-200 = \c 12145. */
通过rd_kafka_assign()函数接口可以配置需要消费的Partition信息。
/*** @brief Atomic assignment of partitions to consume.** The new \p partitions will replace the existing assignment.** When used from a rebalance callback the application shall pass the* partition list passed to the callback (or a copy of it) (even if the list* is empty) rather than NULL to maintain internal join state.* A zero-length \p partitions will treat the partitions as a valid,* albeit empty, assignment, and maintain internal state, while a \c NULL* value for \p partitions will reset and clear the internal state.*/RD_EXPORT rd_kafka_resp_err_trd_kafka_assign (rd_kafka_t *rk,const rd_kafka_topic_partition_list_t *partitions);
03
—
如何配置offset
对于消费者来说,有两个场景来修改订阅的Parttion offset信息:一是系统初始化时直接指定offset信息,二是消费者群组重平衡(rebalance)的回调函数。接下来分别介绍一下。
系统初始化时,指定offset示例:
rd_kafka_topic_partition_list_t *partitions;partitions = rd_kafka_topic_partition_list_new(0);rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234;rd_kafka_assign(rk, partitions);rd_kafka_topic_partition_list_destroy(partitions);
rebalance_cb()回调函数中,指定offset示例。
void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t *partitions, void *opaque) {if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {rd_kafka_topic_partition_t *part;if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3)))part->offset = 1234;rd_kafka_assign(rk, partitions);} else {rd_kafka_assign(rk, NULL);}}
04
—
更多精彩文章请订阅公众号
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 漫话:为什么计算机起始时间是1970年1月1日?
- 记一次Access偏移注入
- ios – reloadRowsAtIndexPaths时保持偏移量
- Kafka 消息偏移量的维护
- Spark Streaming 之 Kafka 偏移量管理
- php-rdkafka手动提交偏移量
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
深入分析Java Web技术内幕(修订版)
许令波 / 电子工业出版社 / 2014-8-1 / CNY 79.00
《深入分析Java Web技术内幕(修订版)》新增了淘宝在无线端的应用实践,包括:CDN 动态加速、多终端化改造、 多终端Session 统一 ,以及在大流量的情况下,如何跨越性能、网络和一个地区的电力瓶颈等内容,并提供了比较完整的解决方案。 《深入分析Java Web技术内幕(修订版)》主要围绕Java Web 相关技术从三方面全面、深入地进行了阐述。首先介绍前端知识,即在JavaWeb ......一起来看看 《深入分析Java Web技术内幕(修订版)》 这本书的介绍吧!