内容简介:flinksql从kafka读取数据,异步函数加载Mysql数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。
问题描述
flinksql从kafka读取数据,异步函数加载 Mysql 数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。
-
不读kafka数据:kafka读取线程像卡住一样,从kafka中读取不到数据,以为是网络原因,但是计算节点和工作节点在同一台机器中,于是排除网络原因。
-
持续读kafka数据,但是异步函数不下发数据:以为是设置的异步超时间超时,默认是10s,增大超时时间后依然不下发。
Jstack 排查
打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。
原来是调用我们的timeout函数出现了阻塞。
public void timeout(Row input, ResultFuture<Row> resultFuture) {
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
try {
// 阻塞等待
if (null == future.get()) {
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
}
} catch (Exception e) {
resultFuture.completeExceptionally(new Exception(e));
}
}
阻塞原因
在flink异步函数asyncInvoke中,只处理了正常逻辑。也就是匹配上调用 resultFuture.complete(rowList); 但是fillData里面进行数据类型转换时很容易发生异常,当发生异常时,resultFuture并没有结果输出,从而导致整个链路阻塞。
List<Row> rowList = Lists.newArrayList();
for (Object jsonArray : (List) val.getContent()) {
Row row = fillData(input, jsonArray);
rowList.add(row);
}
resultFuture.complete(rowList);
解决以及注意事项
fillData进行try-catch捕获发生异常时调用 resultFuture.completeExceptionally(exception);
在flink异步函数中, resultFuture.complete() 只会被调用一次,complete一个集合需要先在填充然后一次性发送,而不是通过遍历调用多次 resultFuture.complete()
使用异步Future一定要记得有输出值。
堆栈信息重点关注有没有我们自己的逻辑 。
欢迎点赞+收藏+转发朋友圈素质三连
文章不错? 点个【 在看 】吧! :point_down:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kafka消费者自动提交配置会导致潜在的重复或数据丢失!
- 消费端如何保证消息队列MQ的有序消费
- 《吊打面试官》系列-分布式事务、重复消费、顺序消费
- 十一贝:航延险智能判定,公平消费环境惠及消费者
- Kafka消费者的偏移量和高级/简单消费者
- RocketMQ 实战:一个新的消费组初次启动时从何处开始消费呢?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Data Structures and Algorithms in Java
Robert Lafore / Sams / 2002-11-06 / USD 64.99
Data Structures and Algorithms in Java, Second Edition is designed to be easy to read and understand although the topic itself is complicated. Algorithms are the procedures that software programs use......一起来看看 《Data Structures and Algorithms in Java》 这本书的介绍吧!