内容简介:艳杰。擅长 Python 与 JAVA , 现任饿了么物流团队资深 Python 工程师,负责分流核心链路, 专注于系统业务分析及稳定性建设。上次我们分享了我们团队spring-rabbit 版本变更至 1.6.2.RELEASE
艳杰。擅长 Python 与 JAVA , 现任饿了么物流团队资深 Python 工程师,负责分流核心链路, 专注于系统业务分析及稳定性建设。
上次我们分享了我们团队 Java应用 Docker 化部署GC变长的踩坑经历 ,发现还真的帮助很多同学解决了他们项目中同样的问题。这对我们来说真的是很大的一个激励,所以我们决定后面会不定期分享一些我们团队的踩坑经历,今天分享的是 Spring-RabbitMQ consumer 的两个坑,希望能对大家有所帮助。
坑 No.1: consumer 假死,无真正的消费能力
背景
spring-rabbit 版本变更至 1.6.2.RELEASE
现象
consumer 数量正常,mq 控制面板的 prefetch
参数始终是1, 消息无法正常 ack, 队列处于假死状态, 系统报异常 org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException
[2018-09-09 10:31:27.27]RuntimeException-org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException [ ERROR] [ Elog ] Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) at java.lang.Thread.run(Thread.java:748) Caused by: org.springframework.amqp.AmqpIllegalStateException: No default listener method specified: Either specify a non-null value for the 'defaultListenerMethod' property or override the 'getListenerMethodName' method. at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:291) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ... 10 more 复制代码
异常原因
ListenerExecutionFailedException
根据官方文档说明,是 consumer 在消息消费时发生了异常, 默认情况下,该消息会被 reject, 并重新回到队列中, 但如果异常发生在到达用户代码之前的异常,此消息将会一直投递。
org.springframework.amqp.AmqpIllegalStateException
该异常抛出,RabbitMQ无法收到消息回应,将一直处于等待状态。
No default listener method specified: Either specify a non-null value ...
找不到 consumer OnMessage
的方法, 猜测是类加载错误,于是重新开始检查 consumer 类定义,最终发现 rabbit:listener
的 ID 和 真实 consumer 的 ID 冲突,导致真实的 consumer Bean 无效,找不到接受消息的方法,而 rabbitmq 在与 client 端通信过程中发生异常,会停止消费。
<bean id="consumer_1" class="me.ele.Consumer1"/> <rabbit:listener-container connection-factory="galaxyConnectionFactory" acknowledge="manual" concurrency="16" > <rabbit:listener queues="queue_1" ref="consumer_1" id="consumer_1" /> </rabbit:listener-container> 复制代码
解决办法
删除 rabbit:listener
的 ID 属性
坑 No.2:consumer 数量异常
背景
原服务有 6 个队列,每个队列起 10 个 consumer, task-executor
线程池设置为 90, 后因数据量增加,发现消费能力不足,决定增加 consumer 数量,调整 listener-container 的 concurrency 为 20,重启服务器。
现象
部分队列 consumer 数量不足,缺失项始终为 xml 中声明在后的队列
异常定位
回退 concurrency 参数为 10 异常消失,观察异常现场,发现 consumer 消失队列有先后顺序之分,且 consumer 数量存在上限值为 90,猜测是收参数限制,检查配置参数如下:
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" id="taskExecutor"> <!--核心线程数 --> <property name="corePoolSize" value="16"/> <!--最大线程数 --> <property name="maxPoolSize" value="16"/> <property name="queueCapacity" value="500"/> <!--线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="60"/> <!--线程池对拒绝任务(无线程可用)的处理策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> <property name="WaitForTasksToCompleteOnShutdown" value="true"/> </bean> <rabbit:listener-container connection-factory="monitorConnectionFactory" acknowledge="manual" task-executor="taskExecutor" prefetch="10" concurrency="20"> <rabbit:listener queues="queue_1" ref="consumer_1"/> <rabbit:listener queues="queue_2" ref="consumer_2"/> <rabbit:listener queues="queue_3" ref="consumer_3"/> <rabbit:listener queues="queue_4" ref="consumer_4"/> <rabbit:listener queues="queue_5" ref="consumer_5"/> <rabbit:listener queues="queue_6" ref="consumer_6"/> </rabbit:listener-container> 复制代码
异常原因
经排查对比,发现上限值与 task-executor ThreadPoolTaskExecutor 参数 corePoolSize
、 maxPoolSize
极为接近,查询相关资料发现,多个 queue 的 consumer 会共用 taskExecutor 的线程池数量,如果线程池数量不足,consumer 无法创建。 后发现官方文档已有明确说明.
解决办法
增大 task-executor corePoolSize
和 maxPoolSize
的值为 200,重启服务,解决。
以上所述就是小编给大家介绍的《等等!这两个 Spring-RabbitMQ 的坑我们已经替你踩了》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Chrome 76 稳定版发布:禁用 Flash、监听扩展等等
- 开源 Go, Py 项目找队友、顾问、贡献者、赞助等等
- 从 “等等” 到 “秒开” 再到 “直开”,是什么让闲鱼社区相见恨晚?
- GoFrame v1.12 发布,数据库驱动开发、日志滚动切分等等新特性
- 最全的常用正则表达式大全——包括校验数字、字符、一些特殊的需求等等
- 等等,谷歌这款新产品是传说中的 Fuchsia OS 设备?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。