内容简介:在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。
在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。
在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。
更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。
Topic的作用域
--all-topics --topic t1 --topic t2 --topic t1:0,1,2
重置策略
--to-earliest --to-latest --to-current --to-offset <offset> --shift-by N --to-datetime <datetime> --by-duration <duration> --from-file <file>
确定执行方案
--execute --export
注意事项
- consumer group状态必须是inactive的,即不能是处于正在工作中的状态
- 不加执行方案,默认是只做打印操作
常用示例
更新到当前group最初的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
更新到指定的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
offset设置到指定时刻开始
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
以上所述就是小编给大家介绍的《Kafka Consumer重置Offset》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。