php-rdkafka手动提交偏移量

栏目: IT技术 · 发布时间: 5年前

内容简介:php-rdkafka手动提交偏移量

在项目中使用php-rdkafka的高级消费者时,发现设置了:

$topicConf->set('enable.auto.commit', 'false');

没有效果,还是会自动提交offset,查了各种资料,正确的应该是这样设置:

$conf->set('enable.auto.commit', 'false');

相关说明见文档:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

附上示例:

setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1:9092');

//设置使用手动提交offset
$conf->set('enable.auto.commit', 'false');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
$consumer->subscribe(['test']);

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            //消费完成后手动提交offset
            //$consumer->commit($message);     
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

 


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

标签: php-rdkafka手动提交偏移量

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

最优化导论

最优化导论

Edwin K. P. Chong、Stanislaw H. Zak / 孙志强、白圣建、郑永斌、刘伟 / 电子工业出版社 / 2015-10 / 89.00

本书是一本关于最优化技术的入门教材,全书共分为四部分。第一部分是预备知识。第二部分主要介绍无约束的优化问题,并介绍线性方程的求解方法、神经网络方法和全局搜索方法。第三部分介绍线性优化问题,包括线性优化问题的模型、单纯形法、对偶理论以及一些非单纯形法,简单介绍了整数线性优化问题。第四部分介绍有约束非线性优化问题,包括纯等式约束下和不等式约束下的优化问题的最优性条件、凸优化问题、有约束非线性优化问题的......一起来看看 《最优化导论》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具