技术分享 RabbitMQ 学习笔记 -- 03 多消费者

duke · 2020-10-24 10:18:44 · 热度: 42

一、简介

在上一章,我们编写了从命名队列发送和接收信息的程序,在本例中,我们将创建一个工作队列,用于在多个消费者分发耗时的任务

工作队列(也称为任务队列)背后的主要思想是避免立即执行资源密集型任务,而必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当运行许多消费者时,任务将在他们之间共享。

二、消费者

这里我们编写两个消费者监听同一个队列 work-queues 的方法,

@Component
public class consumer {
    @RabbitListener(queues = "work-queues")
    public void receive(String msg) {
        System.out.println("receive 消费者消费信息:" + msg);
    }
    @RabbitListener(queues = "work-queues")
    public void receive2(String msg) {
        System.out.println("receive2 消费者消费信息:" + msg);
    }
}

三、生产者

@Test
public void producer() {
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("work-queues", "信息【" + i + "】");
    }
}

控制台输出,消息平均分配给了消费者

receive 消费者消费信息:信息【1】
receive2 消费者消费信息:信息【0】
receive 消费者消费信息:信息【5】
receive 消费者消费信息:信息【9】
receive2 消费者消费信息:信息【4】
receive2 消费者消费信息:信息【8】
receive 消费者消费信息:信息【3】
receive2 消费者消费信息:信息【2】
receive 消费者消费信息:信息【7】
receive2 消费者消费信息:信息【6】

发现:

如果队列里没有消息,两个消费者同时监听 work-queues队列,当队列来消息时,进行公平分配给所有消费者,会根据消息的顺序,分配给消费者进行处理。

如果事先队列中存储了消息,则启动时,@RabbitListener 方法会接收完整的消息批次,而不是一次一次获取它们,其中最多一次性接收处理 Prefetch count个消息,然后队列平均分配给另一个消费者进行处理。如队列中有400条数据,第一个消费者会直接获取并占用250条消息处理,第二个消费者就占取剩下的150条消息进行处理。

“公平派发”是Spring AMQP的默认配置,@RabbitListener 注解的默认监听容器为:AbstractMessageListenerContainer 里面默认预取计数 DEFAULT_PREFETCH_COUNT 的值定义为250,赋值给 prefetchCount

如果默认的预取计数设置为1,则行为将是循环的传递,告诉 RabbitMQ 不要在同一时间给一个消费者超过一条消息。换句话说,只有在消费者空闲的时候才会发送下一条信息。

  • 消费者取走消息后没有及时做消息确认,对于开启手动确认机制的,不进行ack则消息会一直以unacked状态留在队列中。
  • 消费者处理能力不足。生产者投放消息的速度较快,当消费者按照prefetch_count设置的值取走相应数量的消息时,这些消息都会暂时处于unacked状态。

为您推荐与 rabbitmq 相关的帖子:

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册