内容简介:flinksql从kafka读取数据,异步函数加载Mysql数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。
问题描述
flinksql从kafka读取数据,异步函数加载 Mysql 数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。
-
不读kafka数据:kafka读取线程像卡住一样,从kafka中读取不到数据,以为是网络原因,但是计算节点和工作节点在同一台机器中,于是排除网络原因。
-
持续读kafka数据,但是异步函数不下发数据:以为是设置的异步超时间超时,默认是10s,增大超时时间后依然不下发。
Jstack 排查
打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。
原来是调用我们的timeout函数出现了阻塞。
public void timeout(Row input, ResultFuture<Row> resultFuture) { StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture; try { // 阻塞等待 if (null == future.get()) { resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out.")); } } catch (Exception e) { resultFuture.completeExceptionally(new Exception(e)); } }
阻塞原因
在flink异步函数asyncInvoke中,只处理了正常逻辑。也就是匹配上调用 resultFuture.complete(rowList);
但是fillData里面进行数据类型转换时很容易发生异常,当发生异常时,resultFuture并没有结果输出,从而导致整个链路阻塞。
List<Row> rowList = Lists.newArrayList(); for (Object jsonArray : (List) val.getContent()) { Row row = fillData(input, jsonArray); rowList.add(row); } resultFuture.complete(rowList);
解决以及注意事项
fillData进行try-catch捕获发生异常时调用 resultFuture.completeExceptionally(exception);
在flink异步函数中, resultFuture.complete()
只会被调用一次,complete一个集合需要先在填充然后一次性发送,而不是通过遍历调用多次 resultFuture.complete()
使用异步Future一定要记得有输出值。
堆栈信息重点关注有没有我们自己的逻辑 。
欢迎点赞+收藏+转发朋友圈素质三连
文章不错? 点个【 在看 】吧! :point_down:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kafka消费者自动提交配置会导致潜在的重复或数据丢失!
- 消费端如何保证消息队列MQ的有序消费
- 《吊打面试官》系列-分布式事务、重复消费、顺序消费
- 十一贝:航延险智能判定,公平消费环境惠及消费者
- Kafka消费者的偏移量和高级/简单消费者
- RocketMQ 实战:一个新的消费组初次启动时从何处开始消费呢?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
具体数学(英文版第2版)
[美] Ronald L. Graham、Donald E. Knuth、Oren Patashnik / 机械工业出版社 / 2002-8 / 49.00元
This book introduces the mathematics that supports advanced computer Programming and the analysis of algorithms. The primary aim of its well-known authors is to provide a solid and relevant base of ma......一起来看看 《具体数学(英文版第2版)》 这本书的介绍吧!