内容简介:发送消息,生产者 接收消息 消费者 RabbitMQ是Erlang语言开发1对多发布订阅(下篇讲,这篇让你更了解队列)
发送消息,生产者 接收消息 消费者 RabbitMQ是Erlang语言开发
实际场景Exchange用的多
1对多发布订阅(下篇讲,这篇让你更了解队列)
==============开始DEMO
2个控制台
发布者2
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MQ.Product2
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: true,
arguments: null);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
//channel.QueueDeclare(queue: "hello",
// durable: true,
// exclusive: false,
// autoDelete: true,
// arguments: null);
//string message = "Hello World!";
//var body = Encoding.UTF8.GetBytes(message);
//channel.BasicPublish(exchange: "",
// routingKey: "hello",
// basicProperties: null,
// body: body);
//Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
}
这里基于上一个DEMO改的,这里我们设置了一个properties了。
运行项目。
然后消费者修改代码(基于DEMO1的消费者 代码)
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AyTestMQ2
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
//channel.QueueDeclare(queue: "task_queue",
// durable: true,
// exclusive: false,
// autoDelete: true,
// arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
//var consumer = new EventingBasicConsumer(channel);
//consumer.Received += (model, ea) =>
//{
// var body = ea.Body;
// var message = Encoding.UTF8.GetString(body);
// Console.WriteLine(" [x] Received {0}", message);
//};
//channel.BasicConsume(queue: "hello",
// autoAck: true,
// consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
Console.ReadKey();
}
}
}
主要接收消息,处理,模拟耗时工作。
发的消息一个 点号 停顿1秒
生产端消息改下
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello.World.AY.2019");
}
消费端改改
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
var _3 = message.Split('.');
//int dots = message.Split('.').Length - 1;
foreach (var item in _3)
{
Console.WriteLine(item);
Thread.Sleep(1000);
}
运行生产端,然后消费端效果如下
测试2,
开启生产者,然后开启消费者,如上所示,不要关闭,关掉生产者在打开,消费者那段又收到消息了。
同样的,如果有2个消费者, rabbitmq会发给下一个消费者,这种分发消息叫做 round-robin(循环调度)
一个消息只给一个消费者处理。
场景:其实我们可以做 用户的请求,每个请求放入消息队列,然后让消息队列给空闲的 消费者去消费处理。1个消费者不够处理,可以运行多个来吃完任务。
任务会耗时间的。
您可能想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。
上面的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。
在这种情况下,如果当前的消费者挂了,我们将丢失它刚刚处理的消息。
我还将丢失分发给这个消费者的 还未处理的所有消息。
但是我不想丢失任何的消息(1个消息一个任务),如果消费者处理挂了,我当然更想把消息给其他的消费者处理。
为了确保消息永不丢失,RabbitMQ 提供了一个 ack机制, 手动应答,处理完了,告诉兔子,我处理完了,等兔子空闲时候就删除该消息了。
定义 消费者死了,就是 channel关闭,connection关闭,tcp断开了,没网络了。
当消费者还没发送 ack,兔子那边就会认为 消息没有被处理,又会恢复回去了。如果同一时间,还有其他消费者在线,兔子会把这烫手山芋给其他的消费者。
恩,所以啊,你的程序没死,他的消息一直存在兔子那的,除非你手动应答。如果你挂了没应答,会看有没有其他的消费者处理。
接下来模拟这个场景
生产者生产个消息,然后修改消费者代码
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
//channel.QueueDeclare(queue: "task_queue",
// durable: true,
// exclusive: false,
// autoDelete: true,
// arguments: null);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
throw new NotImplementedException();
Console.WriteLine(" [x] Received {0}", message);
var _3 = message.Split('.');
foreach (var item in _3)
{
Console.WriteLine(item);
Thread.Sleep(2000);
}
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
设置了autoack:false了
然后received里面设置了 ea.DeliveryTag
测试1
这里处理消息,停留了2秒1个字段,我们再BasicAck应答之前关闭程序,看消息会不会被删除了。
由于抛出异常Unacked 为1了。
把程序关了
消息还是删除了。。
我怀疑服务端设置了 自动删除导致的。我改为false测试,这样生产了1个不会自动删除的消息。
测试2
运行修改后的生产者
消费者代码不改,让抛出异常
然后关闭程序,过一会,消息恢复正常了。这次就对了。也就是生产者自动删除我觉得大部分都是关闭的。
测试3
正确处理,看消息会不会删除,移除抛弃异常的代码
ready终于是0了
然后关闭客户端,断开连接(执行完using,释放连接),队列被处理了,没删除哦
那如果想要删除呢,暂时先这样吧,因为用的最多的还是Exchange的Topic
注意:
忘记写 BasicAck这行代码, 这是一个简单的错误,但后果是严重的。 当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。
假如忘了unack
测试4
注释掉代码,然后生产个消息,然后运行消费者
再运行消费者,当然 连接不要释放,不然任务客户端死了,又恢复回去了
这里我们打开命令行
rabbitmqctl list_queues name messages_ready messages_unacknowledged
貌似超时了 这里就列出名字了。算了,遇到再看。
=============================================================
持久性,如果兔子挂了,消息还是会丢丢失了。
hannel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
设置持久化,就会不丢失了。 但是兔子不允许你重新定义一个已经存在的队列,然后更改属性
你可以换个名字重新定义一个。
对了,如果服务器重启,我们在上篇博客说到 消息恢复了,但是不可再被消费了,但是如果生产消息时候,加上下面代码就好了,终于解决了 durable=true也无效的问题了。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
将消息标记为持久性并不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接s收消息并且尚未保存消息时,仍然有一个短时间窗口。 此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果您需要更强的保证,那么您可以使用 发布者确认 (publisher confirms)。
公平调度 Fair Dispatch
2个消费者,一个很忙,一个几乎不做事,兔子不知道谁忙谁不忙的,还是均匀的发消息的。
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。
它不会查看消费者未确认消息的数量。
它只是盲目地向第n个消费者发送每个第n个消息
为了改变这种行为,我们可以使用BasicQos方法,shezhi PrefetchCount=1
这会告诉兔子,不要同一时间给超过一个消息以上给一个消费者,因为它很忙,可能还没处理完,你又来了。
换句话说, 在处理并确认前一个消息之前,不要向该工作程序发送新消息。 相反,它会将它发送给下一个不忙的 消费者。
channel.BasicQos(0, 1, false);
这里注意队列的 size
如果所有的 消费者都很忙,并且你的queue填满了。你就要考虑是否添加更多的消费者,或者换个思路去解决问题。
消费者修改后的代码如下:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AyTestMQ2
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
var _3 = message.Split('.');
foreach (var item in _3)
{
Console.WriteLine(item);
}
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
Console.ReadKey();
}
}
}
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
关于IModel内的方法和IBasicProperties你想了解的,可以查看 RabbitMQ .NET client API reference online
特别推荐以下指南
particularly recommend the following guides: Publisher Confirms and Consumer Acknowledgements , Production Checklist and Monitoring .
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
推荐您阅读更多有关于“”的文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- vue笔记3,计算笔记
- Mysql Java 驱动代码阅读笔记及 JDBC 规范笔记
- 【每日笔记】【Go学习笔记】2019-01-16 go网络编程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Zen of CSS Design
Dave Shea、Molly E. Holzschlag / Peachpit Press / 2005-2-27 / USD 44.99
Proving once and for all that standards-compliant design does not equal dull design, this inspiring tome uses examples from the landmark CSS Zen Garden site as the foundation for discussions on how to......一起来看看 《The Zen of CSS Design》 这本书的介绍吧!