内容简介:消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。这种场景: 延迟任务知识:消息的TTL和死信Exchange
消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。
这种场景: 延迟任务
知识:消息的TTL和死信Exchange
RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。
rabbitmq-delayed-message-exchange这个插件也可以实现,看C#实现。
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。
超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。
所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。
这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
Dead Letter Exchanges
定义消息死亡
1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2. 上面的消息的TTL到了,消息过期了。
3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
就是个普通的Exchange,存放死掉的消息
两个控制台:
P端(以后生产者只说P了,消费者是 C端):
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MQ.Product2
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
{
while (Console.ReadLine() != null)
{
using (var channel = connection.CreateModel())
{
Dictionary<string, object> dic = new Dictionary<string, object>();
dic.Add("x-expires", 10000);
dic.Add("x-message-ttl", 8000);//队列上消息过期时间,应小于队列过期时间
dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
dic.Add("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey
//创建一个名叫"deadQueue"的消息队列
channel.QueueDeclare(queue: "deadQueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: dic);
var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
//向该消息队列发送消息message
channel.BasicPublish(exchange: "",
routingKey: "deadQueue",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
Console.ReadKey();
}
}
}
运行项目,按任意键,创建1个队列,刷新web管理
然后过了8秒,队列就删除了。
这里队列有个TTL Exp DLX DLK属性,正好上面3个属性的设置
我们也可以在管理端看到这些消息参数:
单击的时候,就可以设置了,
在网上找了一些资料,下面代码可能 java 的设置,.net简单如上键值就行了
Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于 redis 中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp
Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于 mongodb 中的固定集合,例如保存最新的100条消息, Feature=Lim
Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高( 数值更大的 )的消息先被消费,
Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
Master locator(x-queue-master-locator) 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。
(设置“x-queue-master-locator”参数。)
整理下: 1个 Exchange 可以多个 Queue 1个Queue可以多个Message Message可以设置优先级,Queue可以设置最多多少个Message,多大字节的存储,也可以设置优先级。
上次持久化的问题理解:
设置消息持久化必须先设置队列持久化,要不然 队列不持久化 , 消息持久化 , 队列都不存在了,消息存在还有什么意义 。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。
上面代码,我们看到了,消息过期了,就会被扔到一个叫exchange-direct的 邮箱,真好,那么等他过期了,那么消费者 监听这个邮箱,是不是8秒过后就收到了消息,达到了 延迟的效果。跟RPC的设计异曲同工之妙。如果没有指定邮箱,8秒后消息就会被删除了。 10秒后 队列就会被删除。
接下来,
消费者,就是创建个 被转发的 exchange的监听,
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AyTestMQ2
{
class Program
{
static void Main(string[] args)
{
Console.Title = "AY 2019 2018-12-5";
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct");
string name = channel.QueueDeclare().QueueName;
//string name = "deadQueue";
channel.QueueBind(queue: name, exchange: "exchange-direct", routingKey: "routing-delay");
//回调,当consumer收到消息后会执行该函数
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(ea.RoutingKey);
Console.WriteLine(" [x] Received {0}", message);
};
//Console.WriteLine("name:" + name);
//消费队列"hello"中的消息
channel.BasicConsume(queue: name,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
Console.ReadKey();
}
}
}
打开生产端,按下回车,进入循环,然后打开消费端,8秒后消费者就收到了消息。
如果8秒后,消息都被转发了,消费者打开,是收不到消息的。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
推荐您阅读更多有关于“RabbitMQ,”的文章
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- vue笔记3,计算笔记
- Mysql Java 驱动代码阅读笔记及 JDBC 规范笔记
- 【每日笔记】【Go学习笔记】2019-01-16 go网络编程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
ActionScript 3.0 Cookbook
Joey Lott、Darron Schall、Keith Peters / Adobe Dev Library / 2006-10-11 / GBP 28.50
Well before Ajax and Microsoft's Windows Presentation Foundation hit the scene, Macromedia offered the first method for building web pages with the responsiveness and functionality of desktop programs......一起来看看 《ActionScript 3.0 Cookbook》 这本书的介绍吧!