RabbitMQ入门学习系列(四) 发布订阅模式

栏目: 后端 · 发布时间: 5年前

内容简介:发布订阅模式把消息发送给多个订阅者。也就是有多个消费端都完整的接收生产者的消息换句话说 把消息广播给多个消费者

发布订阅模式

什么时发布订阅模式

把消息发送给多个订阅者。也就是有多个消费端都完整的接收生产者的消息

换句话说 把消息广播给多个消费者

消息模型的核心

RabbitMQ不发送消息给队列,生产者也不知道消息发送到队列

生产者只发送消息到exchange 交换器,

exchange一方面从生产者接收消息,另一方面把消息推送到队列中。

exchange必须知道如何处理接收到的消息 。是加到特定队列中,还是添加到多个队列中,还是放弃。这个是由他的类型来决定 。

而消息的类型有四种,分别是

Direct,Topic,headers,fanout

fanout消息类型

fanout消息类型可以把消息广播到所有队列中。

指定exchange的名称为logs,routingkey设置为空。

channel.BasicPublish(exchange: "logs",
                                         routingKey: "",
                                         basicProperties: null,
                                         body: body);

为队列的名称指定一个随机数

var queueName = channel.QueueDeclare().QueueName;
channel.QueueDeclare(queue: queueName,
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

经过对之前代码的改造

我们定义了一个可以广播类型的exchange和一个随机名字的队列 ,

现在我们需要把他们绑定起来。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

生产者的代码

  1. 创建连接
  2. 创建信道
  3. 声明类型为fanout的消息
  4. publish发送消息
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
        for (var i = 0; i < 10; i++)
        {
            string message = "Hello World!this is message "+i;
            var body = Encoding.UTF8.GetBytes(message);
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);

            Console.WriteLine(" [x] Sent {0},id={1}", message,i);
            Thread.Sleep(1000);
        }
    }

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

消费者代码

  1. 创建连接
  2. 创建信道
  3. 声明类型为fanout的消息
  4. 声明一个队列
  5. 把队列然后绑定到信道上
  6. 接收消息
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
            //以下是区别生产者的
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received {0}", message);
                Thread.Sleep(3000);//模拟耗时任务 ,
                Console.WriteLine("Received over");
                channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
            Console.WriteLine("");
            Console.ReadLine();
        }

    }

}

测试结果

启了一个生产者,两个消费者,生产者发送10条消息 ,两个消费者都收到了10条消息

RabbitMQ入门学习系列(四) 发布订阅模式

友情提示

我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题。可以直接在公众号《爱 码农 爱生活 》留言。必定会再次复查原因。让每一篇 文章的流程都能顺利实现。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

R语言实战

R语言实战

卡巴科弗 (Robert I.Kabacoff) / 高涛、肖楠、陈钢 / 人民邮电出版社 / 2013-1 / 79.00元

数据时代已经到来,但数据分析、数据挖掘人才却十分短缺。由于“大数据”对每个领域的决定性影响, 相对于经验和直觉,在商业、经济及其他领域中基于数据和分析去发现问题并作出科学、客观的决策越来越重要。开源软件R是世界上最流行的数据分析、统计计算及制图语言,几乎能够完成任何数据处理任务,可安装并运行于所有主流平台,为我们提供了成千上万的专业模块和实用工具,是从大数据中获取有用信息的绝佳工具。  本书从解决......一起来看看 《R语言实战》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具