RabbitMQ 消费端的限流策略

栏目: 后端 · 发布时间: 7年前

内容简介:假设一个场景,由于我们的消费端突然全部不可用了,导致 rabbitMQ 服务器上有上万条未处理的消息,这时候如果没做任何现在,随便开启一个消费端客户端,就会导致巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多的数据,就会导致消费端变得巨卡,有可能直接崩溃不可用了。所以在实际生产中,限流保护是很重要的。rabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 QOS 的值)未被确认前,不进行

假设一个场景,由于我们的消费端突然全部不可用了,导致 rabbitMQ 服务器上有上万条未处理的消息,这时候如果没做任何现在,随便开启一个消费端客户端,就会导致巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多的数据,就会导致消费端变得巨卡,有可能直接崩溃不可用了。所以在实际生产中,限流保护是很重要的。

rabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 QOS 的值)未被确认前,不进行消费新的消息。关键代码就是在声明消费者代码里面的

void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
复制代码
  1. prefetchSize:0

  2. prefetchCount:会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack

  3. global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别

备注:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

代码演示:

代码地址:    https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 项目下
复制代码

生产端代码基本没变化,改了 exchange 和 routingKey 而已

public class Procuder {

	private static final Logger log = LoggerFactory.getLogger(Procuder.class);

	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
		connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
		connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		String msg = "Hello RabbitMQ limit Message";
        for(int i = 0; i < 5; i ++){
            log.info("生产端发送:{}", msg + i);
            channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
        }
	}
}
复制代码
消费端代码需要修改一下

autoAck 设置为 false **

增加 ** channel.basicQos(0, 1, false);

完整的消费端代码如下

/**
 * 使用自定义消费者
 */
public class Consumer {

	private static final Logger log = LoggerFactory.getLogger(Consumer.class);
	
	public static final String EXCHANGE_NAME = "test_qos_exchange";
	public static final String EXCHANGE_TYPE = "topic";
	public static final String ROUTING_KEY_TYPE = "qos.#";
	public static final String ROUTING_KEY = "cqos.save";
	public static final String QUEUE_NAME = "test_qos_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		//1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 获取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
        
        /**
         * prefetchSize:0
         prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,限速N个
            即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack 回来,你再发送 N 个过来
         global:true\false 是否将上面设置应用于channel级别,false是consumer级别
         prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究
         */
        channel.basicQos(0, 1, false);

        //使用自定义消费者
        //1 限流方式  第一件事就是 autoAck设置为 false
      //使用自定义消费者
        channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
        log.info("消费端启动成功");
	}
}

复制代码

自定义消费者

public class MyConsumer extends DefaultConsumer {

	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
	
	 private Channel channel;
	 
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
    public void handleDelivery(String consumerTag,  //消费者标签
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //一定要手动ACK回去
       //channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

复制代码

然后启动消费端,上管控台查看 test_qos_exchange 和 test_qos_queue 是否生成了

RabbitMQ 消费端的限流策略

确认 test_qos_exchange 上绑定了 test_qos_queue

RabbitMQ 消费端的限流策略

启动生产端发送 5 条消息

RabbitMQ 消费端的限流策略

发现消费端只打印了一条消息

RabbitMQ 消费端的限流策略

从管控台上也看到总共 5 条消息,有 4 条等待着,一条消费了但是没有 ack 回去

RabbitMQ 消费端的限流策略

修改自定义消费者里面的代码,如下所示

public class MyConsumer extends DefaultConsumer {

	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
	
	 private Channel channel;
	 
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
    public void handleDelivery(String consumerTag,  //消费者标签
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //一定要手动ACK回去
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
复制代码

重启消费端,看到消费端就按照一条一条消费,并且 ACK 回去了

RabbitMQ 消费端的限流策略
RabbitMQ 消费端的限流策略

如上所示就是简单的RabbitMQ消费端的限流策略


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Test Driven Development

Test Driven Development

Kent Beck / Addison-Wesley Professional / 2002-11-18 / USD 49.99

Quite simply, test-driven development is meant to eliminate fear in application development. While some fear is healthy (often viewed as a conscience that tells programmers to "be careful!"), the auth......一起来看看 《Test Driven Development》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

MD5 加密
MD5 加密

MD5 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具