内容简介:实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。实现推模式推荐的方式是继承推模式是最常用的,但是有些情况下推模式并不适用的,比如说:
实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。
实现推模式推荐的方式是继承 DefaultConsumer
基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer
。
推模式是最常用的,但是有些情况下推模式并不适用的,比如说:
- 由于某些限制,消费者在某个条件成立时才能消费消息
- 需要批量拉取消息进行处理
实现拉模式
RabbitMQ的Channel提供了 basicGet
方法用于拉取消息。
/** * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get} * @see com.rabbitmq.client.AMQP.Basic.Get * @see com.rabbitmq.client.AMQP.Basic.GetOk * @see com.rabbitmq.client.AMQP.Basic.GetEmpty * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @return a {@link GetResponse} containing the retrieved message data * @throws java.io.IOException if an error is encountered */ GetResponse basicGet(String queue, boolean autoAck) throws IOException;
basicGet
返回 GetResponse
类。
public class GetResponse { private final Envelope envelope; private final BasicProperties props; private final byte[] body; private final int messageCount; // ...
rabbitmq-client版本4.0.3
使用 basicGet
拉取消息需要注意:
basicGet DefaultConsumer
示例代码:
private void consume(Channel channel) throws IOException, InterruptedException { while (true) { if (!isConditionSatisfied()) { TimeUnit.MILLISECONDS.sleep(1); continue; } GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false); if (response == null) { TimeUnit.MILLISECONDS.sleep(1); continue; } String data = new String(response.getBody()); logger.info("Get message <= {}", data); channel.basicAck(response.getEnvelope().getDeliveryTag(), false); } }
批量拉取消息
RabbitMQ支持客户端批量拉取消息,客户端可以连续调用 basicGet
方法拉取多条消息,处理完成之后一次性ACK。需要注意:
basicGet basicAck
示例代码:
String bridgeQueueName = extractorProperties.getBridgeQueueName(); int batchSize = extractorProperties.getBatchSize(); List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize); long tag = 0; while (responseList.size() < batchSize) { GetResponse getResponse = channel.basicGet(bridgeQueueName, false); if (getResponse == null) { break; } responseList.add(getResponse); tag = getResponse.getEnvelope().getDeliveryTag(); } if (responseList.isEmpty()) { TimeUnit.MILLISECONDS.sleep(1); } else { logger.info("Get <{}> responses this batch", responseList.size()); // handle messages channel.basicAck(tag, true); }
关于QueueingConsumer
QueueingConsumer
在客户端本地使用 BlockingQueue
缓冲消息,其nextDelivery方法也可以用于实现拉模式(其本质上是 BlockingQueue.take
),但是 QueueingConsumer
现在已经标记为Deprecated。
根据 官方文档
的解释, QueueingConsumer
有两个坑:
Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock.
建议不要使用 QueueingConsumer
。
以上所述就是小编给大家介绍的《RabbitMQ拉模式批量消费消息》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 玩转KCP(2)-流模式和消息模式
- RabbitMQ消息交换模式简介
- 消息中间件的四种投递模式对比
- 重学 Java 设计模式:实战观察者模式「模拟类似小客车指标摇号过程,监听消息通知用户中签场景」
- WebSocket的故事(二)—— Spring中如何利用STOMP快速构建WebSocket广播式消息模式
- WebSocket的故事(三)—— Springboot中,如何利用WebSocket和STOMP快速构建点对点的消息模式(1)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
URL 编码/解码
URL 编码/解码
XML、JSON 在线转换
在线XML、JSON转换工具