内容简介:Routing 路由 集中处理 数据 然后 按照 约定/规则 正确的 广播到 消费者在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。Direct Exchange 以前我写的博客
Routing 路由 集中处理 数据 然后 按照 约定/规则 正确的 广播到 消费者
在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
Direct Exchange 以前我写的博客
我们上一个教程中的日志记录系统向所有消费者广播所有消息。 我们希望扩展它以允许根据消息的严重性过滤消息。 例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。
我们使用的是fanout,它没有给我们太大的灵活性 - 它只能进行无意识的广播。
我们将使用direct。 direct背后的路由算法很简单 - 消息进入队列,binding的key和route key一致就行了。
using System; using System.Linq; using RabbitMQ.Client; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var severity = "info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
direct类型的,然后写个 路由的规则,RouteKey 这里直接 给个名字。
消费者代码:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; var severity = "info"; channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
路由key 一致 就接收消息了。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
接下来Topic,直接在这个demo改
using System; using System.Linq; using RabbitMQ.Client; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var routingKey = "anonymous.info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); } Console.ReadKey(); } }
消费者
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var queueName = channel.QueueDeclare().QueueName; var severity = "anonymous.*"; channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: severity); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
跟上面区别是 RouteKey 带了 * 号或者 #号
*号 代表1个单词
#号代表 0个以上的单词
服务端 消息路由规则 anonymous.info
换成 var severity = "a.*";
肯定收不到消息
换成# 肯定可以
*.info也可以
a# 收不到消息的
*.* 可以收到,然后把生产换成anonymous 就收不到了,因为路由规则 1个单词.第二个单词
以上内容是 AY做过测试了。
========================================================================
讲一下 RPC ,稍微有点绕,你理解一个既是消费端也是生产者,双方都是的,也可以配合 http请求响应稍微配合理解。
但是也是有点不一样。 我还是把上面代码注释了,还在那2个 控制台改。
场景:
如果我们需要在远程计算机上运行一个函数并等待结果呢? 嗯,这是一个不同的故事。 此模式通常称为Remote Procedure Call 或者 RPC.
在本教程中,我们将使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci 斐波那契 数字的虚拟RPC服务
为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为Call的方法,该方法发送一个RPC请求并阻塞,直到收到答案为止。
请求服务器(生产者,返回一个斐波那契数字)
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
代码比较好理解的,fib是一个 返回斐波那契数字的,这里不考虑数字是否是正整数了。
自己创建一个接收请求的队列,名字叫rpc_queue,手动应答,处理完成,再应答完成。
然后收到一个消息后,处理,中间有个约定的CorrelationId 写上去,
然后 在往这个RouteKey写上 返回值的一些信息。
消费者:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); Console.ReadLine(); } }
以上的调用没有进行类型判断响应,比如输入的是否合法,如果服务器没有运行等,这里是最简单的调用示例。
客户端是否应该有超时设计等。
比如服务器处理发生异常,是否应该转发回客户端,说没处理好。
此处介绍的设计并不是RPC服务的唯一可能实现,但它具有一些重要优势:
如果RPC服务器太慢,您可以通过运行另一个服务器来扩展。 尝试在新控制台中运行第二个RPCServer。
在客户端,RPC只需要发送和接收一条消息。 不需要像QueueDeclare这样的同步调用。 因此,对于单个RPC请求,RPC客户端只需要一次网络往返。
测试:
先运行 服务端,服务端等待 消费者的请求,然后接收到,处理返回给 消费者。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
到此,基本的类库调用 AY讲解完了。
后面讲解点运维方面的。基本场景。
推荐您阅读更多有关于“RabbitMQ,”的文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- vue笔记3,计算笔记
- Mysql Java 驱动代码阅读笔记及 JDBC 规范笔记
- 【每日笔记】【Go学习笔记】2019-01-16 go网络编程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Visual Thinking
Colin Ware / Morgan Kaufmann / 2008-4-18 / USD 49.95
Increasingly, designers need to present information in ways that aid their audiences thinking process. Fortunately, results from the relatively new science of human visual perception provide valuable ......一起来看看 《Visual Thinking》 这本书的介绍吧!