内容简介:通过这种方式实现对于简单的定时任务是ok的,过于复杂的、可用性要求较高的系统就会存在以下缺点。RabbitMQ本身是不支持的,可以通过它提供的两个特性
通过这种方式实现对于简单的定时任务是ok的,过于复杂的、可用性要求较高的系统就会存在以下缺点。
-
存在的一些问题
- 消耗系统内存,如果定时任务很多,长时间得不到释放,将会一直占用系统进程耗费内存。
- 单线程如何保障出现系统崩溃后之前的定时任务不受影响?多进程集群模式下一致性的保证?
- setTimeout、setInterval会存在时间误差,对于时间精度要求较高的是不行的。
- RabbitMQ TTL+DLX 实现定时任务
RabbitMQ本身是不支持的,可以通过它提供的两个特性 Time-To-Live and Expiration 、 Dead Letter Exchanges 来实现,通过以下泳道图可以看到一个消息从发布到消费的整个过程。
死信队列
死信队列全称 Dead-Letter-Exchange 简称 DLX 是 RabbitMQ 中交换器的一种类型,消息在一段时间之后没有被消费就会变成死信被重新 publish 到另一个 DLX 交换器队列中,因此称为死信队列。
-
死信队列产生几种情况
- 消息被拒绝
- 消息TTL过期
- 队列达到最大长度
-
设置DLX的两个参数:
deadLetterExchange deadLetterRoutingKey
注意
:Dead-Letter-Exchange也是一种普通的Exchange
消息TTL
消息的TTL指的是消息的存活时间,RabbitMQ支持消息、队列两种方式设置TTL,分别如下:
- 消息设置TTL :对消息的设置是在发送时进行TTL设置,通过
x-message-ttl
或expiration
字段设置,单位为毫秒,代表消息的过期时间,每条消息的TTL可不同。 - 队列设置TTL :对队列的设置是在消息入队列时计算,通过
x-expires
设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。
注意
:如果以上两种方式都做了设置,消息的TTL则以两者之中最小的那个为准。
Nodejs操作RabbitMQ实现延迟队列
推荐采用 amqplib 库,一个Node.js实现的RabbitMQ客户端。
- 初始化RabbitMQ
rabbitmq.js
// npm install amqplib const amqp = require('amqplib'); let connection = null; module.exports = { connection, init: () => amqp.connect('amqp://localhost:5672').then(conn => { connection = conn; console.log('rabbitmq connect success'); return connection; }) }
- 生产者
/** * 路由一个死信队列 * @param { Object } connnection */ async function producerDLX(connnection) { const testExchange = 'testEx'; const testQueue = 'testQu'; const testExchangeDLX = 'testExDLX'; const testRoutingKeyDLX = 'testRoutingKeyDLX'; const ch = await connnection.createChannel(); await ch.assertExchange(testExchange, 'direct', { durable: true }); const queueResult = await ch.assertQueue(testQueue, { exclusive: false, deadLetterExchange: testExchangeDLX, deadLetterRoutingKey: testRoutingKeyDLX, }); await ch.bindQueue(queueResult.queue, testExchange); const msg = 'hello world!'; console.log('producer msg:', msg); await ch.sendToQueue(queueResult.queue, new Buffer(msg), { expiration: '10000' }); ch.close(); }
- 消费者
consumer.js
const rabbitmq = require('./rabbitmq.js'); /** * 消费一个死信队列 * @param { Object } connnection */ async function consumerDLX(connnection) { const testExchangeDLX = 'testExDLX'; const testRoutingKeyDLX = 'testRoutingKeyDLX'; const testQueueDLX = 'testQueueDLX'; const ch = await connnection.createChannel(); await ch.assertExchange(testExchangeDLX, 'direct', { durable: true }); const queueResult = await ch.assertQueue(testQueueDLX, { exclusive: false, }); await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX); await ch.consume(queueResult.queue, msg => { console.log('consumer msg:', msg.content.toString()); }, { noAck: true }); } // 消费消息 rabbitmq.init().then(connection => consumerDLX(connection));
- 运行查看
分别执行消费者和生产者,可以看到 producer 在44秒发布了消息,consumer 是在54秒接收到的消息,实现了定时10秒种执行
$ node consumer # 执行消费者 [2019-05-07T08:45:23.099] [INFO] default - rabbitmq connect success [2019-05-07T08:45:54.562] [INFO] default - consumer msg: hello world!
$ node producer # 执行生产者 [2019-05-07T08:45:43.973] [INFO] default - rabbitmq connect success [2019-05-07T08:45:44.000] [INFO] default - producer msg: hello world!
- 管理控制台查看
testQu 队列为我们定义的正常队列消息过期,会变成死信,会被路由到 testQueueDLX 队列,形成一个死信队列。
作者:五月君
链接: https://www.imooc.com/article...
来源:慕课网
Github: Node.js技术栈
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- RabbitMQ实现延迟队列
- 基于 Redis 实现的延迟消息队列
- 你真的知道怎么实现一个延迟队列吗?
- Spring Boot(十四)RabbitMQ延迟队列
- 【RabbitMQ】一文带你搞定RabbitMQ延迟队列
- 灵感来袭,基于Redis的分布式延迟队列
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
TensorFlow实战
黄文坚、唐源 / 电子工业出版社 / 2017-2-1 / 79
Google近日发布了TensorFlow 1.0候选版,这个稳定版将是深度学习框架发展中的里程碑的一步。自TensorFlow于2015年底正式开源,距今已有一年多,这期间TensorFlow不断给人以惊喜,推出了分布式版本,服务框架TensorFlow Serving,可视化工具TensorFlow,上层封装TF.Learn,其他语言(Go、Java、Rust、Haskell)的绑定、Wind......一起来看看 《TensorFlow实战》 这本书的介绍吧!