Node.js结合RabbitMQ延迟队列实现定时任务

栏目: Node.js · 发布时间: 5年前

内容简介:通过这种方式实现对于简单的定时任务是ok的,过于复杂的、可用性要求较高的系统就会存在以下缺点。RabbitMQ本身是不支持的,可以通过它提供的两个特性

通过这种方式实现对于简单的定时任务是ok的,过于复杂的、可用性要求较高的系统就会存在以下缺点。

  • 存在的一些问题

    1. 消耗系统内存,如果定时任务很多,长时间得不到释放,将会一直占用系统进程耗费内存。
    2. 单线程如何保障出现系统崩溃后之前的定时任务不受影响?多进程集群模式下一致性的保证?
    3. setTimeout、setInterval会存在时间误差,对于时间精度要求较高的是不行的。
  • RabbitMQ TTL+DLX 实现定时任务

RabbitMQ本身是不支持的,可以通过它提供的两个特性 Time-To-Live and ExpirationDead Letter Exchanges 来实现,通过以下泳道图可以看到一个消息从发布到消费的整个过程。

Node.js结合RabbitMQ延迟队列实现定时任务

死信队列

死信队列全称 Dead-Letter-Exchange 简称 DLX 是 RabbitMQ 中交换器的一种类型,消息在一段时间之后没有被消费就会变成死信被重新 publish 到另一个 DLX 交换器队列中,因此称为死信队列。

  • 死信队列产生几种情况

    • 消息被拒绝
    • 消息TTL过期
    • 队列达到最大长度
  • 设置DLX的两个参数:

    deadLetterExchange
    deadLetterRoutingKey
    

注意 :Dead-Letter-Exchange也是一种普通的Exchange

消息TTL

消息的TTL指的是消息的存活时间,RabbitMQ支持消息、队列两种方式设置TTL,分别如下:

  • 消息设置TTL :对消息的设置是在发送时进行TTL设置,通过 x-message-ttlexpiration 字段设置,单位为毫秒,代表消息的过期时间,每条消息的TTL可不同。
  • 队列设置TTL :对队列的设置是在消息入队列时计算,通过 x-expires 设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。

注意 :如果以上两种方式都做了设置,消息的TTL则以两者之中最小的那个为准。

Nodejs操作RabbitMQ实现延迟队列

推荐采用 amqplib 库,一个Node.js实现的RabbitMQ客户端。

  • 初始化RabbitMQ

rabbitmq.js

// npm install amqplib
const amqp = require('amqplib');

let connection = null;

module.exports = {
    connection,

    init: () => amqp.connect('amqp://localhost:5672').then(conn => {
        connection = conn;

        console.log('rabbitmq connect success');

        return connection;
    })
}
  • 生产者
/**
 * 路由一个死信队列
 * @param { Object } connnection 
 */
async function producerDLX(connnection) {
    const testExchange = 'testEx';
    const testQueue = 'testQu';
    const testExchangeDLX = 'testExDLX';
    const testRoutingKeyDLX = 'testRoutingKeyDLX';
    
    const ch = await connnection.createChannel();
    await ch.assertExchange(testExchange, 'direct', { durable: true });
    const queueResult = await ch.assertQueue(testQueue, {
        exclusive: false,
        deadLetterExchange: testExchangeDLX,
        deadLetterRoutingKey: testRoutingKeyDLX,
    });
    await ch.bindQueue(queueResult.queue, testExchange);
    const msg = 'hello world!';
    console.log('producer msg:', msg);
    await ch.sendToQueue(queueResult.queue, new Buffer(msg), {
        expiration: '10000'
    });
    
    ch.close();
}
  • 消费者

consumer.js

const rabbitmq = require('./rabbitmq.js');

/**
 * 消费一个死信队列
 * @param { Object } connnection 
 */
async function consumerDLX(connnection) {
    const testExchangeDLX = 'testExDLX';
    const testRoutingKeyDLX = 'testRoutingKeyDLX';
    const testQueueDLX = 'testQueueDLX';

    const ch = await connnection.createChannel();
    await ch.assertExchange(testExchangeDLX, 'direct', { durable: true });
    const queueResult = await ch.assertQueue(testQueueDLX, {
        exclusive: false,
    });
    await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX);
    await ch.consume(queueResult.queue, msg => {
        console.log('consumer msg:', msg.content.toString());
    }, { noAck: true });
}

// 消费消息
rabbitmq.init().then(connection => consumerDLX(connection));
  • 运行查看

分别执行消费者和生产者,可以看到 producer 在44秒发布了消息,consumer 是在54秒接收到的消息,实现了定时10秒种执行

$ node consumer # 执行消费者
[2019-05-07T08:45:23.099] [INFO] default - rabbitmq connect success
[2019-05-07T08:45:54.562] [INFO] default - consumer msg: hello world!
$ node producer # 执行生产者
[2019-05-07T08:45:43.973] [INFO] default - rabbitmq connect success
[2019-05-07T08:45:44.000] [INFO] default - producer msg: hello world!
  • 管理控制台查看

testQu 队列为我们定义的正常队列消息过期,会变成死信,会被路由到 testQueueDLX 队列,形成一个死信队列。

Node.js结合RabbitMQ延迟队列实现定时任务

作者:五月君

链接: https://www.imooc.com/article...

来源:慕课网

Github: Node.js技术栈


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

函数式算法设计珠玑

函数式算法设计珠玑

Richard Bird / 苏统华、孙芳媛、郝文超、徐琴 / 机械工业出版社 / 2017-4-1 / 69.00

本书采用完全崭新的方式介绍算法设计。全书由30个珠玑构成,每个珠玑单独列为一章,用于解决一个特定编程问题。这些问题的出处五花八门,有的来自游戏或拼图,有的是有趣的组合任务,还有的是散落于数据压缩及字串匹配等领域的更为熟悉的算法。每个珠玑以使用函数式编程语言Haskell对问题进行描述作为开始,每个解答均是诉诸于函数式编程法则从问题表述中计算得到。本书适用于那些喜欢学习算法设计思想的函数式编程人员、......一起来看看 《函数式算法设计珠玑》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换