kafka0.9 Consumer poll()方法阻塞

栏目: 后端 · 发布时间: 8年前

内容简介:kafka0.9 Consumer poll()方法阻塞

最近项目中用到了Kafka0.9,在使用0.9的Consumer API的时候遇到了poll()方法阻塞的问题。程序没有报任何错误,只是持续在poll()方法处阻塞。深入poll()方法可以看到是在AbstractCoordinator.ensureCoordinatorKnown()方法中出现了死循环。在循环中不停地输出如下DEBUG日志:

DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing group metadata request to broker 2

DEBUG main org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1495335769027, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@5c3bd550, request=RequestSend(header={api_key=10,api_version=0,correlation_id=90,client_id=consumer-1}, body={group_id=stats_consume_group}), createdTimeMs=1495335768924, sendTimeMs=1495335768924), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})

DEBUG main org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=91,client_id=consumer-1}, body={topics=[statistic_test]}), isInitiatedByNetworkClient, createdTimeMs=1495335769027, sendTimeMs=0) to node 2

DEBUG main org.apache.kafka.clients.Metadata - Updated cluster metadata version 47 to Cluster(nodes = [Node(0, kafka-node, 9092), Node(2, sz-pg-adtracking-bigdisk-003, 9092), Node(1, sz-pg-adtracking-bigdisk-002, 9092)], partitions = [Partition(topic = statistic_test, partition = 1, leader = 2, replicas = [2,1,], isr = [1,2,], Partition(topic = statistic_test, partition = 2, leader = 0, replicas = [0,2,], isr = [0,2,], Partition(topic = statistic_test, partition = 0, leader = 1, replicas = [1,0,], isr = [0,1,], Partition(topic = statistic_test, partition = 5, leader = 0, replicas = [0,1,], isr = [0,1,], Partition(topic = statistic_test, partition = 6, leader = 1, replicas = [1,0,], isr = [0,1,], Partition(topic = statistic_test, partition = 3, leader = 1, replicas = [1,2,], isr = [1,2,], Partition(topic = statistic_test, partition = 4, leader = 2, replicas = [2,0,], isr = [0,2,], Partition(topic = statistic_test, partition = 7, leader = 2, replicas = [2,1,], isr = [1,2,], Partition(topic = statistic_test, partition = 8, leader = 0, replicas = [0,2,], isr = [0,2,]])

需要关注的是这处信息:

error_code=15,coordinator={node_id=-1,host=,port=-1}

看样子是kafka的连接出了问题。不过我的Producer向kafka写数据是没问题的,使用kafka提供的消费工具kafka-console-consumer.sh执行消费也是没问题的。

在网上找到了一些关于这个现象的解释:在客户端进行消费之前会为ConsumerGroup向Kafka集群申请coordinater节点。kafka集群在配置或分配coordinater节点的时候可能会短暂的报这个错误。

我这里不是短暂的报错,而是陷入了死循环。目前可以想到的就是我的kafka集群配置出现问题了。在简单粗暴地将zookeeper上kafka的配置完全删掉再重启Kafka后,消费可以正常执行了。至于问题具体出在哪儿还没有找到。目前只能是持续关注,等问题再次出现了。

就这样。

######


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Java数据结构和算法

Java数据结构和算法

拉佛 / 计晓云 / 中国电力出版社 / 2004-02-01 / 55.00元

《Java数据结构和算法》(第2版)以一种易懂的方式教授如何安排和操纵数据的问题,其中不乏一些难题:了解这些知识以期使计算机的应用获得最好的表现。不管使用何种语言或平台,掌握了数据结构和算法将改进程序的质量和性能。 《Java数据结构和算法》(第2版)提供了一套独创的可视讨论专题用以阐明主要的论题:它使用Java语言说明重要的概念,而避免了C/C++语言的复杂性,以便集中精力论述数据结构和算法。经......一起来看看 《Java数据结构和算法》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具