NServiceBus 结合 RabbitMQ 使用教程

栏目: Redis · 发布时间: 6年前

内容简介:NServiceBus 结合 RabbitMQ 使用教程

NServiceBus 结合 RabbitMQ 使用可以参考官方教程:

Step by Step Guide

新建4个项目:

  • A Console Application named  Client
  • A Console Application named  Server
  • A Console Application named  Subscriber
  • A Class Library named  Shared

Framework框架选择4.6及以上,后面有用到。

Client,Server,Subscriber引用Shared。

4个项目都安装NServiceBus包:

Install-Package NServiceBus

3个控制台项目安装NServiceBus.RabbitMQ包:

Install-Package NServiceBus.RabbitMQ

Share代码:

using NServiceBus;
public class PlaceOrder:ICommand
    {
        public Guid Id { get; set; }
        public string Product { get; set; } 
    }
public class OrderPlaced:IEvent
    {
        public Guid OrderId { get; set; }
    }
public class PlaceShipping:ICommand
    {
        public Guid Id { get; set; }
        public string Product { get; set; }
    }

Client代码:

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Sample.StepByStep.Client";
            var endpointConfiguration = new EndpointConfiguration(endpointName: "Sample.StepByStep.Client");
            endpointConfiguration.SendFailedMessagesTo("error");
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
            try
            {
                await SendOrder(endpointInstance);
            }
            catch (Exception)
            {
                await endpointInstance.Stop().ConfigureAwait(false);
            }
        }

        private static async Task SendOrder(IEndpointInstance endpointInstance)
        {
            Console.WriteLine("Press enter to send a message");
            Console.WriteLine("Press any key to exit");
            while(true)
            {
                var key = Console.ReadKey();
                Console.WriteLine();
                if(key.Key!=ConsoleKey.Enter)
                {
                    return;
                }
                var id = Guid.NewGuid();
                var id2 = Guid.NewGuid();
                var placeOrder = new PlaceOrder
                {
                    Product = "New shoes",
                    Id = id
                };
                var placeShipping = new PlaceShipping
                {
                    Product = "A-->B",
                    Id = id2
                };
                await endpointInstance.Send("Samples.StepByStep.Server", placeOrder);
                await endpointInstance.Send("Samples.StepByStep.Server", placeShipping);
                Console.WriteLine($"Sent a PlaceOrder messge with id:{id:N}");
                Console.WriteLine($"Sent a PlaceShipping messge with id:{id2:N}");
            }
        }
    }
}

Server代码:

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Samples.StepByStep.Server";
            var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Server");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.SendFailedMessagesTo("error");

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            try
            {
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
            finally
            {
                await endpointInstance.Stop()
                    .ConfigureAwait(false);
            }
        }
    }
}
namespace Server
{
    public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        static ILog log = LogManager.GetLogger<PlaceOrderHandler>();

        public Task Handle(PlaceOrder message, IMessageHandlerContext context)
        {
            log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
            log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

            var orderPlaced = new OrderPlaced
            {
                OrderId = message.Id
            };
            return context.Publish(orderPlaced);
        }
    }
}
namespace Server
{
    public class PlaceShippingHandler : IHandleMessages<PlaceShipping>
    {
        static ILog log = LogManager.GetLogger<PlaceShippingHandler>();

        public Task Handle(PlaceShipping message, IMessageHandlerContext context)
        {
            log.Info($"Shipping for Product:{message.Product} placed with id: {message.Id}");
            return Task.CompletedTask;
        }
    }
}

为什么要选4.6以上,原因就在Task.CompletedTask需要4.6以上。

SubScribe代码:

namespace Subscriber
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Samples.StepByStep.Subscriber";
            var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Subscriber");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.SendFailedMessagesTo("error");

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            try
            {
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
            finally
            {
                await endpointInstance.Stop()
                    .ConfigureAwait(false);
            }
        }
    }
}
namespace Subscriber
{
    public class OrderCreatedHandler : IHandleMessages<OrderPlaced>
    {
        static ILog log = LogManager.GetLogger<OrderCreatedHandler>();

        public Task Handle(OrderPlaced message, IMessageHandlerContext context)
        {
            log.Info($"Handling: OrderPlaced for Order Id: {message.OrderId}");
            return Task.CompletedTask;
        }
    }
}

选择多启动项目:

NServiceBus 结合 RabbitMQ 使用教程

启动项目,在Client端按回车,可以看到Server端和Subscribe端的接收信息:

NServiceBus 结合 RabbitMQ 使用教程

同时查看http://10.255.20.44:15672/#/queues:

NServiceBus 结合 RabbitMQ 使用教程

CentOS 7.2 下 RabbitMQ 集群搭建 http://www.linuxidc.com/Linux/2016-12/137812.htm

CentOS7环境安装使用专业的消息队列产品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

RabbitMQ入门教程 http://www.linuxidc.com/Linux/2015-02/113983.htm

在CentOS7上安装RabbitMQ 详解 http://www.linuxidc.com/Linux/2017-05/143765.htm

RabbitMQ 的详细介绍 请点这里

RabbitMQ 的下载地址 请点这里

本文永久更新链接地址 http://www.linuxidc.com/Linux/2017-05/143787.htm


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

博客秘诀:超人气博客是怎样炼成的

博客秘诀:超人气博客是怎样炼成的

Darren Rowse、Chris Garrett / 向怡宁 / 人民邮电出版社 / 201005 / 39.00元

作为Web 2.0的新生事物的博客,如今已蓬勃发展,呈燎原之势,业已成为许多人的一种生活方式。中国从事博客写作的人数已达千万级,各类博客网站不可胜数。 然而,为什么有的博客人气鼎盛,拥趸众多,有的博客却门前冷落,少人问津呢?究竟应该怎样写好自己的博客,才能让它吸引更多访客的关注呢?博客网站还能为我做什么呢? 本书的两位作者长期主持知名博客站点ProBlogger.net,指导了成千上万......一起来看看 《博客秘诀:超人气博客是怎样炼成的》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换