内容简介:NServiceBus 结合 RabbitMQ 使用教程
NServiceBus 结合 RabbitMQ 使用可以参考官方教程:
新建4个项目:
- A Console Application named
Client - A Console Application named
Server - A Console Application named
Subscriber - A Class Library named
Shared
Framework框架选择4.6及以上,后面有用到。
Client,Server,Subscriber引用Shared。
4个项目都安装NServiceBus包:
Install-Package NServiceBus
3个控制台项目安装NServiceBus.RabbitMQ包:
Install-Package NServiceBus.RabbitMQ
Share代码:
using NServiceBus;
public class PlaceOrder:ICommand
{
public Guid Id { get; set; }
public string Product { get; set; }
}
public class OrderPlaced:IEvent
{
public Guid OrderId { get; set; }
}
public class PlaceShipping:ICommand
{
public Guid Id { get; set; }
public string Product { get; set; }
}
Client代码:
namespace Client
{
class Program
{
static void Main(string[] args)
{
AsyncMain().GetAwaiter().GetResult();
}
static async Task AsyncMain()
{
Console.Title = "Sample.StepByStep.Client";
var endpointConfiguration = new EndpointConfiguration(endpointName: "Sample.StepByStep.Client");
endpointConfiguration.SendFailedMessagesTo("error");
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
endpointConfiguration.UseSerialization<JsonSerializer>();
endpointConfiguration.EnableInstallers();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
try
{
await SendOrder(endpointInstance);
}
catch (Exception)
{
await endpointInstance.Stop().ConfigureAwait(false);
}
}
private static async Task SendOrder(IEndpointInstance endpointInstance)
{
Console.WriteLine("Press enter to send a message");
Console.WriteLine("Press any key to exit");
while(true)
{
var key = Console.ReadKey();
Console.WriteLine();
if(key.Key!=ConsoleKey.Enter)
{
return;
}
var id = Guid.NewGuid();
var id2 = Guid.NewGuid();
var placeOrder = new PlaceOrder
{
Product = "New shoes",
Id = id
};
var placeShipping = new PlaceShipping
{
Product = "A-->B",
Id = id2
};
await endpointInstance.Send("Samples.StepByStep.Server", placeOrder);
await endpointInstance.Send("Samples.StepByStep.Server", placeShipping);
Console.WriteLine($"Sent a PlaceOrder messge with id:{id:N}");
Console.WriteLine($"Sent a PlaceShipping messge with id:{id2:N}");
}
}
}
}
Server代码:
namespace Server
{
class Program
{
static void Main(string[] args)
{
AsyncMain().GetAwaiter().GetResult();
}
static async Task AsyncMain()
{
Console.Title = "Samples.StepByStep.Server";
var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Server");
endpointConfiguration.UseSerialization<JsonSerializer>();
endpointConfiguration.EnableInstallers();
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.SendFailedMessagesTo("error");
var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
try
{
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
finally
{
await endpointInstance.Stop()
.ConfigureAwait(false);
}
}
}
}
namespace Server
{
public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
{
static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
public Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");
var orderPlaced = new OrderPlaced
{
OrderId = message.Id
};
return context.Publish(orderPlaced);
}
}
}
namespace Server
{
public class PlaceShippingHandler : IHandleMessages<PlaceShipping>
{
static ILog log = LogManager.GetLogger<PlaceShippingHandler>();
public Task Handle(PlaceShipping message, IMessageHandlerContext context)
{
log.Info($"Shipping for Product:{message.Product} placed with id: {message.Id}");
return Task.CompletedTask;
}
}
}
为什么要选4.6以上,原因就在Task.CompletedTask需要4.6以上。
SubScribe代码:
namespace Subscriber
{
class Program
{
static void Main(string[] args)
{
AsyncMain().GetAwaiter().GetResult();
}
static async Task AsyncMain()
{
Console.Title = "Samples.StepByStep.Subscriber";
var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Subscriber");
endpointConfiguration.UseSerialization<JsonSerializer>();
endpointConfiguration.EnableInstallers();
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.SendFailedMessagesTo("error");
var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
try
{
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
finally
{
await endpointInstance.Stop()
.ConfigureAwait(false);
}
}
}
}
namespace Subscriber
{
public class OrderCreatedHandler : IHandleMessages<OrderPlaced>
{
static ILog log = LogManager.GetLogger<OrderCreatedHandler>();
public Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
log.Info($"Handling: OrderPlaced for Order Id: {message.OrderId}");
return Task.CompletedTask;
}
}
}
选择多启动项目:
启动项目,在Client端按回车,可以看到Server端和Subscribe端的接收信息:
同时查看http://10.255.20.44:15672/#/queues:
CentOS 7.2 下 RabbitMQ 集群搭建 http://www.linuxidc.com/Linux/2016-12/137812.htm
CentOS7环境安装使用专业的消息队列产品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm
RabbitMQ入门教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
在CentOS7上安装RabbitMQ 详解 http://www.linuxidc.com/Linux/2017-05/143765.htm
RabbitMQ 的详细介绍 : 请点这里
RabbitMQ 的下载地址 : 请点这里
本文永久更新链接地址 : http://www.linuxidc.com/Linux/2017-05/143787.htm
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 结合案例使用 Java 注解和反射
- Apache使用fcgi方式与PHP结合
- Redux 基础教程以及结合 React 使用方式
- Android:你该如何结合Retrofit 与 RxJava使用(含使用教程)
- laravel-admin 与 vue 结合使用
- 使用socket.io与express结合,体验websocket
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
HTML 编码/解码
HTML 编码/解码
XML、JSON 在线转换
在线XML、JSON转换工具