内容简介:php-rdkafka手动提交偏移量
在项目中使用php-rdkafka的高级消费者时,发现设置了:
$topicConf->set('enable.auto.commit', 'false');
没有效果,还是会自动提交offset,查了各种资料,正确的应该是这样设置:
$conf->set('enable.auto.commit', 'false');
相关说明见文档:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
附上示例:
setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1:9092');
//设置使用手动提交offset
$conf->set('enable.auto.commit', 'false');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['test']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
//消费完成后手动提交offset
//$consumer->commit($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 记一次Access偏移注入
- ios – reloadRowsAtIndexPaths时保持偏移量
- Kafka 消息偏移量的维护
- Spark Streaming 之 Kafka 偏移量管理
- 人品爆发:偏移注入与移位溢注的联合使用
- bug诞生记——隐蔽的指针偏移计算导致的数据错乱
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
ARM嵌入式系统开发
斯洛斯 / 北京航大 / 2005-5 / 75.00元
《ARM嵌入式系统开发:软件设计与优化》从软件设计的角度,全面、系统地介绍了ARM处理器的基本体系结构和软件设计与优化方法。内容包括:ARM处理器基础;ARM/Thumb指令集;C语言与汇编语言程序的设计与优化;基本运算、操作的优化;基于ARM的DSP;异常与中断处理;固件与嵌入式OS;cache与存储器管理;ARMv6体系结构的特点等。全书内容完整,针对各种不同的ARM内核系统结构都有详尽论述,......一起来看看 《ARM嵌入式系统开发》 这本书的介绍吧!