rabbitmq实现延时队列(死信队列)

栏目: 后端 · 发布时间: 6年前

内容简介:TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject

基于队列和基于消息的TTL

TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。

死信交换机DLX

队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false; (2)消息的过期时间到期了; (3)队列长度限制超过了。 当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key.

@Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
        .build();
  }

实现的过程

graph TD publisher-->DLX DLX-->dead.queue dead.queue-->正常queue 正常queue-->subscriber

完整的代码

@Component
public class AmqpConfig {
  /**
   * 主要测试一个死信队列,功能主要实现延时消费,原理是先把消息发到正常队列,
   * 正常队列有超时时间,当达到时间后自动发到死信队列,然后由消费者去消费死信队列里的消息.
   */
  public static final String LIND_EXCHANGE = "lind.exchange";
  public static final String LIND_DL_EXCHANGE = "lind.dl.exchange";
  public static final String LIND_QUEUE = "lind.queue";
  public static final String LIND_DEAD_QUEUE = "lind.queue.dead";

  public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange";
  /**
   * 单位为微秒.
   */
  @Value("${tq.makecall.expire:60000}")
  private long makeCallExpire;

  /**
   * 创建普通交换机.
   */
  @Bean
  public TopicExchange lindExchange() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true)
        .build();
  }

  /**
   * 创建死信交换机.
   */
  @Bean
  public TopicExchange lindExchangeDl() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
        .build();
  }

  /**
   * 创建普通队列.
   */
  @Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
        .build();
  }

  /**
   * 创建死信队列.
   */
  @Bean
  public Queue lindDelayQueue() {
    return QueueBuilder.durable(LIND_DEAD_QUEUE).build();
  }

  /**
   * 绑定死信队列.
   */
  @Bean
  public Binding bindDeadBuilders() {
    return BindingBuilder.bind(lindDelayQueue())
        .to(lindExchangeDl())
        .with(LIND_DEAD_QUEUE);
  }

  /**
   * 绑定普通队列.
   *
   * @return
   */
  @Bean
  public Binding bindBuilders() {
    return BindingBuilder.bind(lindQueue())
        .to(lindExchange())
        .with(LIND_QUEUE);
  }

  /**
   * 广播交换机.
   *
   * @return
   */
  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(LIND_FANOUT_EXCHANGE);
  }
}


//-----------------

@Component
public class Publisher {
  @Autowired
  private RabbitTemplate rabbitTemplate;


  public void publish(String message) {
    try {
      rabbitTemplate
          .convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
              message);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

//-----------------

@Component
@Slf4j
public class Subscriber {
  @RabbitListener(queues = AmqpConfig.LIND_QUEUE)
  public void customerSign(String data) {
    try {

      log.info("从队列拿到数据 :{}", data);

    } catch (Exception ex) {
          e.printStackTrace();
    }
  }
}

以上所述就是小编给大家介绍的《rabbitmq实现延时队列(死信队列)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

智能主义

智能主义

周鸿祎 / 中信出版集团股份有限公司 / 2016-11-1 / CNY 49.00

大数据和人工智能迅猛发展,对社会和商业的影响日益深刻,从学术界到企业界,智能化时代必将来临,已经成为共识。而此次变革,将会开启新一轮的发展浪潮。企业家、互联网以及传统企业、个人,应当如何理解这一轮的发展,如何行动以抓住智能化所带来的众多机遇,成为所有人持之以恒的关注热点。 周鸿祎作为最具洞察力的互联网老兵、人工智能领域成功的先行者,通过总结360公司的战略布局、产品规划、方法论实践,从思想到......一起来看看 《智能主义》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具