NServiceBus 结合 RabbitMQ 使用教程

栏目: Redis · 发布时间: 8年前

内容简介:NServiceBus 结合 RabbitMQ 使用教程

NServiceBus 结合 RabbitMQ 使用可以参考官方教程:

Step by Step Guide

新建4个项目:

  • A Console Application named  Client
  • A Console Application named  Server
  • A Console Application named  Subscriber
  • A Class Library named  Shared

Framework框架选择4.6及以上,后面有用到。

Client,Server,Subscriber引用Shared。

4个项目都安装NServiceBus包:

Install-Package NServiceBus

3个控制台项目安装NServiceBus.RabbitMQ包:

Install-Package NServiceBus.RabbitMQ

Share代码:

using NServiceBus;
public class PlaceOrder:ICommand
    {
        public Guid Id { get; set; }
        public string Product { get; set; } 
    }
public class OrderPlaced:IEvent
    {
        public Guid OrderId { get; set; }
    }
public class PlaceShipping:ICommand
    {
        public Guid Id { get; set; }
        public string Product { get; set; }
    }

Client代码:

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Sample.StepByStep.Client";
            var endpointConfiguration = new EndpointConfiguration(endpointName: "Sample.StepByStep.Client");
            endpointConfiguration.SendFailedMessagesTo("error");
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
            try
            {
                await SendOrder(endpointInstance);
            }
            catch (Exception)
            {
                await endpointInstance.Stop().ConfigureAwait(false);
            }
        }

        private static async Task SendOrder(IEndpointInstance endpointInstance)
        {
            Console.WriteLine("Press enter to send a message");
            Console.WriteLine("Press any key to exit");
            while(true)
            {
                var key = Console.ReadKey();
                Console.WriteLine();
                if(key.Key!=ConsoleKey.Enter)
                {
                    return;
                }
                var id = Guid.NewGuid();
                var id2 = Guid.NewGuid();
                var placeOrder = new PlaceOrder
                {
                    Product = "New shoes",
                    Id = id
                };
                var placeShipping = new PlaceShipping
                {
                    Product = "A-->B",
                    Id = id2
                };
                await endpointInstance.Send("Samples.StepByStep.Server", placeOrder);
                await endpointInstance.Send("Samples.StepByStep.Server", placeShipping);
                Console.WriteLine($"Sent a PlaceOrder messge with id:{id:N}");
                Console.WriteLine($"Sent a PlaceShipping messge with id:{id2:N}");
            }
        }
    }
}

Server代码:

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Samples.StepByStep.Server";
            var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Server");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.SendFailedMessagesTo("error");

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            try
            {
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
            finally
            {
                await endpointInstance.Stop()
                    .ConfigureAwait(false);
            }
        }
    }
}
namespace Server
{
    public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        static ILog log = LogManager.GetLogger<PlaceOrderHandler>();

        public Task Handle(PlaceOrder message, IMessageHandlerContext context)
        {
            log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
            log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

            var orderPlaced = new OrderPlaced
            {
                OrderId = message.Id
            };
            return context.Publish(orderPlaced);
        }
    }
}
namespace Server
{
    public class PlaceShippingHandler : IHandleMessages<PlaceShipping>
    {
        static ILog log = LogManager.GetLogger<PlaceShippingHandler>();

        public Task Handle(PlaceShipping message, IMessageHandlerContext context)
        {
            log.Info($"Shipping for Product:{message.Product} placed with id: {message.Id}");
            return Task.CompletedTask;
        }
    }
}

为什么要选4.6以上,原因就在Task.CompletedTask需要4.6以上。

SubScribe代码:

namespace Subscriber
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Samples.StepByStep.Subscriber";
            var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Subscriber");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.SendFailedMessagesTo("error");

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            try
            {
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
            finally
            {
                await endpointInstance.Stop()
                    .ConfigureAwait(false);
            }
        }
    }
}
namespace Subscriber
{
    public class OrderCreatedHandler : IHandleMessages<OrderPlaced>
    {
        static ILog log = LogManager.GetLogger<OrderCreatedHandler>();

        public Task Handle(OrderPlaced message, IMessageHandlerContext context)
        {
            log.Info($"Handling: OrderPlaced for Order Id: {message.OrderId}");
            return Task.CompletedTask;
        }
    }
}

选择多启动项目:

NServiceBus 结合 RabbitMQ 使用教程

启动项目,在Client端按回车,可以看到Server端和Subscribe端的接收信息:

NServiceBus 结合 RabbitMQ 使用教程

同时查看http://10.255.20.44:15672/#/queues:

NServiceBus 结合 RabbitMQ 使用教程

CentOS 7.2 下 RabbitMQ 集群搭建 http://www.linuxidc.com/Linux/2016-12/137812.htm

CentOS7环境安装使用专业的消息队列产品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

RabbitMQ入门教程 http://www.linuxidc.com/Linux/2015-02/113983.htm

在CentOS7上安装RabbitMQ 详解 http://www.linuxidc.com/Linux/2017-05/143765.htm

RabbitMQ 的详细介绍 请点这里

RabbitMQ 的下载地址 请点这里

本文永久更新链接地址 http://www.linuxidc.com/Linux/2017-05/143787.htm


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Learn Python the Hard Way

Learn Python the Hard Way

Zed A. Shaw / Addison-Wesley Professional / 2013-10-11 / USD 39.99

Master Python and become a programmer-even if you never thought you could! This breakthrough book and CD can help practically anyone get started in programming. It's called "The Hard Way," but it's re......一起来看看 《Learn Python the Hard Way》 这本书的介绍吧!

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具