内容简介:发布订阅模式把消息发送给多个订阅者。也就是有多个消费端都完整的接收生产者的消息换句话说 把消息广播给多个消费者
发布订阅模式
什么时发布订阅模式
把消息发送给多个订阅者。也就是有多个消费端都完整的接收生产者的消息
换句话说 把消息广播给多个消费者
消息模型的核心
RabbitMQ不发送消息给队列,生产者也不知道消息发送到队列
生产者只发送消息到exchange 交换器,
exchange一方面从生产者接收消息,另一方面把消息推送到队列中。
exchange必须知道如何处理接收到的消息 。是加到特定队列中,还是添加到多个队列中,还是放弃。这个是由他的类型来决定 。
而消息的类型有四种,分别是
Direct,Topic,headers,fanout
fanout消息类型
fanout消息类型可以把消息广播到所有队列中。
指定exchange的名称为logs,routingkey设置为空。
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
为队列的名称指定一个随机数
var queueName = channel.QueueDeclare().QueueName; channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
经过对之前代码的改造
我们定义了一个可以广播类型的exchange和一个随机名字的队列 ,
现在我们需要把他们绑定起来。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
生产者的代码
- 创建连接
- 创建信道
- 声明类型为fanout的消息
- publish发送消息
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); for (var i = 0; i < 10; i++) { string message = "Hello World!this is message "+i; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0},id={1}", message,i); Thread.Sleep(1000); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
消费者代码
- 创建连接
- 创建信道
- 声明类型为fanout的消息
- 声明一个队列
- 把队列然后绑定到信道上
- 接收消息
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); //以下是区别生产者的 var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); Thread.Sleep(3000);//模拟耗时任务 , Console.WriteLine("Received over"); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(""); Console.ReadLine(); } } }
测试结果
启了一个生产者,两个消费者,生产者发送10条消息 ,两个消费者都收到了10条消息
友情提示
我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题。可以直接在公众号《爱 码农 爱生活 》留言。必定会再次复查原因。让每一篇 文章的流程都能顺利实现。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 设计模式之发布订阅模式(2) Redis实现发布订阅模式
- 设计模式之发布订阅模式(1) 一文搞懂发布订阅模式
- 使用并解析 OPML 格式的订阅列表来转移自己的 RSS 订阅(解析篇)
- Redis订阅与发布
- 消息队列和发布订阅
- JavaScript 发布-订阅模式
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。