RabbitMQ入门学习系列(三).消息发送接收

栏目: 后端 · 发布时间: 5年前

内容简介:​ 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失。通过ack的消息确认和持久化进行操作。 以及Rabbit中如何用Web面板进行管理队列。消费者如何处理耗时的任务创建链接=》创建信道=》声明队列 。连续生产10条消息供消费者消费创建链接=》创建信道=》声明队列 =>创建EventingBasicConsumer=》接收消息进行处理。

快速阅读

​ 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失。通过ack的消息确认和持久化进行操作。 以及Rabbit中如何用Web面板进行管理队列。消费者如何处理耗时的任务

生产者代码

创建链接=》创建信道=》声明队列 。连续生产10条消息供消费者消费

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "hello",
                             durable: false,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        for (var i = 0; i < 10; i++) //连续生产10条消息,让消费者消费
        {
            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }


    }

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

消费者代码

创建链接=》创建信道=》声明队列 =>创建EventingBasicConsumer=》接收消息进行处理。

如果挂断,消息会丢失。

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false, exclusive: false, autoDelete: false, arguments: null);

            //以下是区别生产者的
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received {0}", message);
                Thread.Sleep(3000);//模拟耗时任务 ,
                Console.WriteLine("Received over");

            };
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine("");
            Console.ReadLine();
        }

    }

}

测试结果

RabbitMQ入门学习系列(三).消息发送接收

从中我们可以看到,消费者每3秒消费一个任务 。

消息确认

如果一个消费者挂掉以后,怎么办呢?

正常逻辑是RabbitMq把消费发送给消费者以后,会把消费从队列中删除 。

但是如果消费者挂掉以后怎么办呢?因为这个时候消息已经发送出去,

假如这个消息 在被消费者处理前挂掉了,我们就会丢失这个消费,

为了避免这种问题的出现, 我们要用到消息确认机制, 就是当消费者处理完消息以后,再给rabbitmq一个确认信息,告诉他我已经处理好了,你可以删除了,RabbitMQ接收到以后,会从队列中把这个消息删除, 这就保证了消息会不会因消费者挂掉而丢失没有处理的消息如果Rabbit没有接收到消息确认的通知(在超时之前) ,则会把这个消息再放到队列中,发送给另外的消费者。 **

我们把你代码改一下

消费者代码中,加入ack发送的标志

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine("Received {0}", message);
    Thread.Sleep(3000);//模拟耗时任务 ,
    Console.WriteLine("Received over");
    channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
};

发送者代码中加入发送的消息标识

for (var i = 0; i < 10; i++)
 {
     string message = "Hello World!this is message "+i;
     var body = Encoding.UTF8.GetBytes(message);
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true;

     channel.BasicPublish(exchange: "",
                          routingKey: "hello",
                          basicProperties: null,
                          body: body);
     Console.WriteLine(" [x] Sent {0},id={1}", message,i);
     Thread.Sleep(1000);
 }

启动了三个消费者进程 ,但是发现队列中的任务 没有被消费完

RabbitMQ入门学习系列(三).消息发送接收

还有id为6,7,8,9没有被消费, 这个时候是再重启一个消费者才可以消费完。

RabbitMQ入门学习系列(三).消息发送接收

有点奇怪了。先放这里吧,做一个问题记录一下

=》更新下进展

晚上的时候查了一下。

经常测试发现 要把autoAck设置为false才可以。

channel.BasicConsume(queue: "HelloDurable1", autoAck: false, consumer: consumer);  //这个是正常的
channel.BasicConsume(queue: "HelloDurable1", autoAck: true, consumer: consumer); //这个只能消费一部分,还需要重启才可以再消费
  • 经查autoAck 是否自动确认消息,true自动确认,false 不自动要手动调用,建立设置为false

启动三个消费者测试发现正常 。

RabbitMQ入门学习系列(三).消息发送接收

消息持久性

我们还需要考虑到当RabbitMq.server挂掉的时候,消息也会丢失。

为了避免此类问题:需要把消息和队列都标识为持久性。

当我们标识为以后,重启程序时,发现报错了。

RabbitMQ入门学习系列(三).消息发送接收

根据提示可以看出, 队列hello先前没有被标记为持久化,但已经存在了,我们不能改变他的属性,

我们可以新建一个新的队列 。比如HelloDurable,就可以了。

生产者和消费者两端都要修改。

或者打开Rabbitmq的监控把队列进行删除

RabbitMq监控

先开始管理程序

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins en
able rabbitmq_management

RabbitMQ入门学习系列(三).消息发送接收

查看安装

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins.bat list

RabbitMQ入门学习系列(三).消息发送接收

输入管理面板地址

http://127.0.0.1 :15672/

用户名:guest ;密码 guest

RabbitMQ入门学习系列(三).消息发送接收

登陆进去以后,找到队列列表,删除相应的队列就可以了。

RabbitMQ入门学习系列(三).消息发送接收

RabbitMQ入门学习系列(三).消息发送接收

队列持久化的声明

channel.QueueDeclare(queue: "HelloDurable",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

消费持久化的声明

var properties = channel.CreateBasicProperties();
 properties.Persistent = true;

这样即使服务器重启消息也不会丢失的。

消息负载均衡

为了避免有些消费者不能获得资源,有些消费者获得资源过多的情况,我们要做如下配置

在消费者代码中增加

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

表示每次取一个消息。

通过使用消息确认标识和配置消息持久性,让我们的消息可以持久化和不会被丢失。

友情提示

我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题。可以直接在公众号《爱 码农 爱生活 》留言。必定会再次复查原因。让每一篇 文章都能顺利实现。道理讲明白 。原理讲清楚。代码必实现


以上所述就是小编给大家介绍的《RabbitMQ入门学习系列(三).消息发送接收》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Python Algorithms

Python Algorithms

Magnus Lie Hetland / Apress / 2010-11-24 / USD 49.99

Python Algorithms explains the Python approach to algorithm analysis and design. Written by Magnus Lie Hetland, author of Beginning Python, this book is sharply focused on classical algorithms, but it......一起来看看 《Python Algorithms》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具