RabbitMQ拉模式批量消费消息

栏目: 后端 · 发布时间: 6年前

内容简介:实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。实现推模式推荐的方式是继承推模式是最常用的,但是有些情况下推模式并不适用的,比如说:

实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。

实现推模式推荐的方式是继承 DefaultConsumer 基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer

推模式是最常用的,但是有些情况下推模式并不适用的,比如说:

  • 由于某些限制,消费者在某个条件成立时才能消费消息
  • 需要批量拉取消息进行处理

实现拉模式

RabbitMQ的Channel提供了 basicGet 方法用于拉取消息。

/**
 * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
 * @see com.rabbitmq.client.AMQP.Basic.Get
 * @see com.rabbitmq.client.AMQP.Basic.GetOk
 * @see com.rabbitmq.client.AMQP.Basic.GetEmpty
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @return a {@link GetResponse} containing the retrieved message data
 * @throws java.io.IOException if an error is encountered
 */
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

basicGet 返回 GetResponse 类。

public class GetResponse {
    private final Envelope envelope;
    private final BasicProperties props;
    private final byte[] body;
    private final int messageCount;
    
    // ...

rabbitmq-client版本4.0.3

使用 basicGet 拉取消息需要注意:

basicGet
DefaultConsumer

示例代码:

private void consume(Channel channel) throws IOException, InterruptedException {
    while (true) {
        if (!isConditionSatisfied()) {
            TimeUnit.MILLISECONDS.sleep(1);
            continue;
        }
        GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false);
        if (response == null) {
            TimeUnit.MILLISECONDS.sleep(1);
            continue;
        }
        String data = new String(response.getBody());
        logger.info("Get message <= {}", data);
        channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
    }
}

批量拉取消息

RabbitMQ支持客户端批量拉取消息,客户端可以连续调用 basicGet 方法拉取多条消息,处理完成之后一次性ACK。需要注意:

basicGet
basicAck

示例代码:

String bridgeQueueName = extractorProperties.getBridgeQueueName();
int batchSize = extractorProperties.getBatchSize();
List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);
long tag = 0;
while (responseList.size() < batchSize) {
    GetResponse getResponse = channel.basicGet(bridgeQueueName, false);
    if (getResponse == null) {
        break;
    }
    responseList.add(getResponse);
    tag = getResponse.getEnvelope().getDeliveryTag();
}
if (responseList.isEmpty()) {
    TimeUnit.MILLISECONDS.sleep(1);
} else {
    logger.info("Get <{}> responses this batch", responseList.size());
    // handle messages
    channel.basicAck(tag, true);
}

关于QueueingConsumer

QueueingConsumer 在客户端本地使用 BlockingQueue 缓冲消息,其nextDelivery方法也可以用于实现拉模式(其本质上是 BlockingQueue.take ),但是 QueueingConsumer 现在已经标记为Deprecated。

根据 官方文档 的解释, QueueingConsumer 有两个坑:

Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock.

建议不要使用 QueueingConsumer


以上所述就是小编给大家介绍的《RabbitMQ拉模式批量消费消息》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

艾伦•图灵传

艾伦•图灵传

(英)安德鲁·霍奇斯 / 孙天齐 / 湖南科学技术出版社 / 2012-8-1 / 68.00元

《艾伦·图灵传:如谜的解谜者》是图灵诞辰100周年纪念版,印制工艺更为精美。本书是世界共认的最权威的图灵传记。艾伦?图灵是现代人工智能的鼻祖,在24岁时奠定了计算机的理论基础。二战期间,他为盟军破译密码,为结束战争做出巨大贡献。战后,他开创性地提出人工智能的概念,并做了大量的前期工作。因同性恋问题事发,被迫注射激素,后来吃毒苹果而死。作者是一名数学家,也是一名同性恋者。他对图灵的生平有切身的体会,......一起来看看 《艾伦•图灵传》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

MD5 加密
MD5 加密

MD5 加密工具