内容简介:Routing 路由 集中处理 数据 然后 按照 约定/规则 正确的 广播到 消费者在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。Direct Exchange 以前我写的博客
Routing 路由 集中处理 数据 然后 按照 约定/规则 正确的 广播到 消费者
在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
Direct Exchange 以前我写的博客
我们上一个教程中的日志记录系统向所有消费者广播所有消息。 我们希望扩展它以允许根据消息的严重性过滤消息。 例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。
我们使用的是fanout,它没有给我们太大的灵活性 - 它只能进行无意识的广播。
我们将使用direct。 direct背后的路由算法很简单 - 消息进入队列,binding的key和route key一致就行了。
using System; using System.Linq; using RabbitMQ.Client; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var severity = "info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
direct类型的,然后写个 路由的规则,RouteKey 这里直接 给个名字。
消费者代码:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; var severity = "info"; channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
路由key 一致 就接收消息了。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
接下来Topic,直接在这个demo改
using System; using System.Linq; using RabbitMQ.Client; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var routingKey = "anonymous.info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); } Console.ReadKey(); } }
消费者
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var queueName = channel.QueueDeclare().QueueName; var severity = "anonymous.*"; channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: severity); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
跟上面区别是 RouteKey 带了 * 号或者 #号
*号 代表1个单词
#号代表 0个以上的单词
服务端 消息路由规则 anonymous.info
换成 var severity = "a.*";
肯定收不到消息
换成# 肯定可以
*.info也可以
a# 收不到消息的
*.* 可以收到,然后把生产换成anonymous 就收不到了,因为路由规则 1个单词.第二个单词
以上内容是 AY做过测试了。
========================================================================
讲一下 RPC ,稍微有点绕,你理解一个既是消费端也是生产者,双方都是的,也可以配合 http请求响应稍微配合理解。
但是也是有点不一样。 我还是把上面代码注释了,还在那2个 控制台改。
场景:
如果我们需要在远程计算机上运行一个函数并等待结果呢? 嗯,这是一个不同的故事。 此模式通常称为Remote Procedure Call 或者 RPC.
在本教程中,我们将使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci 斐波那契 数字的虚拟RPC服务
为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为Call的方法,该方法发送一个RPC请求并阻塞,直到收到答案为止。
请求服务器(生产者,返回一个斐波那契数字)
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
代码比较好理解的,fib是一个 返回斐波那契数字的,这里不考虑数字是否是正整数了。
自己创建一个接收请求的队列,名字叫rpc_queue,手动应答,处理完成,再应答完成。
然后收到一个消息后,处理,中间有个约定的CorrelationId 写上去,
然后 在往这个RouteKey写上 返回值的一些信息。
消费者:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); Console.ReadLine(); } }
以上的调用没有进行类型判断响应,比如输入的是否合法,如果服务器没有运行等,这里是最简单的调用示例。
客户端是否应该有超时设计等。
比如服务器处理发生异常,是否应该转发回客户端,说没处理好。
此处介绍的设计并不是RPC服务的唯一可能实现,但它具有一些重要优势:
如果RPC服务器太慢,您可以通过运行另一个服务器来扩展。 尝试在新控制台中运行第二个RPCServer。
在客户端,RPC只需要发送和接收一条消息。 不需要像QueueDeclare这样的同步调用。 因此,对于单个RPC请求,RPC客户端只需要一次网络往返。
测试:
先运行 服务端,服务端等待 消费者的请求,然后接收到,处理返回给 消费者。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
到此,基本的类库调用 AY讲解完了。
后面讲解点运维方面的。基本场景。
推荐您阅读更多有关于“RabbitMQ,”的文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- vue笔记3,计算笔记
- Mysql Java 驱动代码阅读笔记及 JDBC 规范笔记
- 【每日笔记】【Go学习笔记】2019-01-16 go网络编程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
ACM国际大学生程序设计竞赛
俞勇 编 / 2012-12 / 29.00元
《ACM国际大学生程序设计竞赛:知识与入门》适用于参加ACM国际大学生程序设计竞赛的本科生和研究生,对参加青少年信息学奥林匹克竞赛的中学生也很有指导价值。同时,作为程序设计、数据结构、算法等相关课程的拓展与提升,《ACM国际大学生程序设计竞赛:知识与入门》也是难得的教学辅助读物。一起来看看 《ACM国际大学生程序设计竞赛》 这本书的介绍吧!