技术分享 PHP 与 Kafka 连接与搭建

adam · 2019-04-10 19:50:33 · 热度: 338

背景

由于相关项目有 Kafka 的操作需求,因此,需要当前平台连接 Kafka 获取相关数据。

安装与配置

系统环境

  • CentOS 6.7
  • Kafka 0.10
  • PHP 5.6.21

安装 php Kafka 扩展

根据官网指引地址 我们选择对应的 PHP 客户端

Dev 环境准备

$ sudo yum install zlib zlib-devel openssl openssl-devel cyrus-sasl2 cyrus-sasl-devel

安装 php-rdkafka 的依赖 C 扩展库

$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make 
$ sudo make install

安装 php-rdkafka的 PHP 扩展

参考官网安装说明

$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd php-rdkafka
$ # For PHP 7, checkout the php7 branch:
$ # git checkout php7
$ phpize
$ ./configure
$ make all -j 5
$ sudo make install
$ sh -c 'echo "extension=kafka.so" >> /usr/local/php/etc/php.ini'

查看 php-rdkafka 配置

$ php -m | grep kafka
Warning: PHP Startup: Unable to load dynamic library '/usr/local/php/lib/php/extensions/no-debug-non-zts-20131226/kafka.so' - librdkafka.so.1: cannot open shared object file: No such file or directory in Unknown on line 0

修复加载.so 问题

$ sudo touch /etc/ld.so.conf.d/librd.conf
$ sudo sh -c 'echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf'
$ ldconfig
$ ldconfig -p

查看 php 安装好的 kafka 配置

$ php -m | grep kafka
rdkafka

初步使用 Demo

目前使用高级消费者模式,参考官网的 demo 如下

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignemts (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup_1');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '10.110.92.95,10.110.92.101');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
$consumer->subscribe(['NetBaseInfo']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joinging the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

补充

连接线上 kafka 业务测试

测试 kafka 地址

# server
10.110.92.101  # kafka zookeeper JDK
10.110.92.95   # kafka zookeeper JDK
10.110.95.23   # kafka zookeeper JDK -- 暂时有问题

# productor port
9092

# consume port
2181

# topic
NetBaseInfo

猜你喜欢:
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册