背景
由于相关项目有 Kafka 的操作需求,因此,需要当前平台连接 Kafka 获取相关数据。
安装与配置
系统环境
- CentOS 6.7
- Kafka 0.10
- PHP 5.6.21
安装 php Kafka 扩展
Dev 环境准备
$ sudo yum install zlib zlib-devel openssl openssl-devel cyrus-sasl2 cyrus-sasl-devel
安装 php-rdkafka 的依赖 C 扩展库
$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make
$ sudo make install
安装 php-rdkafka的 PHP 扩展
参考官网安装说明
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd php-rdkafka
$ # For PHP 7, checkout the php7 branch:
$ # git checkout php7
$ phpize
$ ./configure
$ make all -j 5
$ sudo make install
$ sh -c 'echo "extension=kafka.so" >> /usr/local/php/etc/php.ini'
查看 php-rdkafka 配置
$ php -m | grep kafka
Warning: PHP Startup: Unable to load dynamic library '/usr/local/php/lib/php/extensions/no-debug-non-zts-20131226/kafka.so' - librdkafka.so.1: cannot open shared object file: No such file or directory in Unknown on line 0
修复加载.so 问题
$ sudo touch /etc/ld.so.conf.d/librd.conf
$ sudo sh -c 'echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf'
$ ldconfig
$ ldconfig -p
查看 php 安装好的 kafka 配置
$ php -m | grep kafka
rdkafka
初步使用 Demo
目前使用高级消费者模式,参考官网的 demo 如下
$conf = new RdKafka\Conf();
// Set a rebalance callback to log partition assignemts (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup_1');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '10.110.92.95,10.110.92.101');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['NetBaseInfo']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joinging the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
补充
连接线上 kafka 业务测试
测试 kafka 地址
# server
10.110.92.101 # kafka zookeeper JDK
10.110.92.95 # kafka zookeeper JDK
10.110.95.23 # kafka zookeeper JDK -- 暂时有问题
# productor port
9092
# consume port
2181
# topic
NetBaseInfo
猜你喜欢:
暂无回复。