内容简介:消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。这种场景: 延迟任务知识:消息的TTL和死信Exchange
消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。
这种场景: 延迟任务
知识:消息的TTL和死信Exchange
RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。
rabbitmq-delayed-message-exchange这个插件也可以实现,看C#实现。
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。
超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。
所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。
这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
Dead Letter Exchanges
定义消息死亡
1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2. 上面的消息的TTL到了,消息过期了。
3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
就是个普通的Exchange,存放死掉的消息
两个控制台:
P端(以后生产者只说P了,消费者是 C端):
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ.Product2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) { while (Console.ReadLine() != null) { using (var channel = connection.CreateModel()) { Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-expires", 10000); dic.Add("x-message-ttl", 8000);//队列上消息过期时间,应小于队列过期时间 dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由 dic.Add("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey //创建一个名叫"deadQueue"的消息队列 channel.QueueDeclare(queue: "deadQueue", durable: true, exclusive: false, autoDelete: false, arguments: dic); var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); //向该消息队列发送消息message channel.BasicPublish(exchange: "", routingKey: "deadQueue", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } } Console.ReadKey(); } } }
运行项目,按任意键,创建1个队列,刷新web管理
然后过了8秒,队列就删除了。
这里队列有个TTL Exp DLX DLK属性,正好上面3个属性的设置
我们也可以在管理端看到这些消息参数:
单击的时候,就可以设置了,
在网上找了一些资料,下面代码可能 java 的设置,.net简单如上键值就行了
Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于 redis 中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp
Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于 mongodb 中的固定集合,例如保存最新的100条消息, Feature=Lim
Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高( 数值更大的 )的消息先被消费,
Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
Master locator(x-queue-master-locator) 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。
(设置“x-queue-master-locator”参数。)
整理下: 1个 Exchange 可以多个 Queue 1个Queue可以多个Message Message可以设置优先级,Queue可以设置最多多少个Message,多大字节的存储,也可以设置优先级。
上次持久化的问题理解:
设置消息持久化必须先设置队列持久化,要不然 队列不持久化 , 消息持久化 , 队列都不存在了,消息存在还有什么意义 。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。
上面代码,我们看到了,消息过期了,就会被扔到一个叫exchange-direct的 邮箱,真好,那么等他过期了,那么消费者 监听这个邮箱,是不是8秒过后就收到了消息,达到了 延迟的效果。跟RPC的设计异曲同工之妙。如果没有指定邮箱,8秒后消息就会被删除了。 10秒后 队列就会被删除。
接下来,
消费者,就是创建个 被转发的 exchange的监听,
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { Console.Title = "AY 2019 2018-12-5"; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct"); string name = channel.QueueDeclare().QueueName; //string name = "deadQueue"; channel.QueueBind(queue: name, exchange: "exchange-direct", routingKey: "routing-delay"); //回调,当consumer收到消息后会执行该函数 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(ea.RoutingKey); Console.WriteLine(" [x] Received {0}", message); }; //Console.WriteLine("name:" + name); //消费队列"hello"中的消息 channel.BasicConsume(queue: name, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } Console.ReadKey(); } } }
打开生产端,按下回车,进入循环,然后打开消费端,8秒后消费者就收到了消息。
如果8秒后,消息都被转发了,消费者打开,是收不到消息的。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
推荐您阅读更多有关于“RabbitMQ,”的文章
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- vue笔记3,计算笔记
- Mysql Java 驱动代码阅读笔记及 JDBC 规范笔记
- 【每日笔记】【Go学习笔记】2019-01-16 go网络编程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。