基于Abp VNext框架设计 - Masstransit分布式消息

栏目: IT技术 · 发布时间: 4年前

内容简介:abp 通过abp 默认实现基于RabbitMq消息队列

abp 通过 IDistributedEventBus 接口集成自IEventBus实现分布式事件消息的发布订阅。

IEventBus 在什么时机触发 PublishAsync ?

  1. 当前UnitOfWork完成时,触发 IEventBusPublishAsync

  2. 在没有事务环境下,同步调用 IEventBusPublishAsync

abp 默认实现基于RabbitMq消息队列 Volo.Abp.EventBus.RabbitMQ 实现分布式消息的发布与订阅。

消息治理核心问题:

  1. 生产端如何保证投递成功的消息不能丢失。

  2. Mq自身如何保证消息不丢失。

  3. 消费段如何保证消费端的消息不丢失。

基于abp 默认实现的DistributedEventBus不能满足以下场景:

  1. Publisher 生产者无法保证消息一定能投递到MQ。

  2. Consumer 消费端在消息消费时,出现异常时,没有异常错误处理机制(确保消费失败的消息能重新被消费)。

我们引入Masstransit,来提升abp对消息治理能力。

Masstransit提供以下开箱即用功能:

  1. Publish/Send/Request-Response等几种消息投递机制。

  2. 多种IOC容器支持。

  3. 异常机制。

  4. Saga事务管理。

  5. 事务活动补偿机制(Courier)

  6. 消息审计

  7. 消息管道处理机制

Abp 框架下事件消息集成

  1. 使用MassTransit重新实现 IDistributedEventBus

  2. 在消费端Consumer传递用户身份信息。

  3. 使用Asp.Net Core Web Host 作消费端Consumer宿主。

集成MassTransit

在Module初始化时,注入MassTransit实例,并启动。

Copy

/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{
var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();

var mqConnectionString = "rabbitmq://" + options.ConnectionString;


context.Services.AddMassTransit(mtConfig =>
{
//inject consumers into IOC from assembly
mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));


mtConfig.AddBus(provider =>
{
var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
{
var host = mqConfig.Host(new Uri(mqConnectionString), h =>
{
h.Username(options.UserName);
h.Password(options.Password);
});
// set special message serializer
mqConfig.UseBsonSerializer();

// integrated existed logger compontent
mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());

mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 200;

//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));

q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);
EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);

});

mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 50;
//q.UseRateLimit(100, TimeSpan.FromSeconds(1));
//q.UseConcurrencyLimit(2);

//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));

q.Consumer<UserSyncEventConsumer>(provider);
EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
});

mqConfig.ConfigureEndpoints(provider);

mqConfig.UseAuditingFilter(provider, o =>
{
o.ReplaceAuditing = true;
});
});

// set authtication middleware for user identity
bus.ConnectAuthenticationObservers(provider);

return bus;
});
});
}

在MassTransit中,使用 IBusControl 接口  StartAsync 或  StopAsync 来启动或停止。

使用 IPublishEndpoint 重新实现 IDistributedEventBus 接口,实现与abp分布式事件总线集成。

Copy

public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency
{
private readonly IPublishEndpoint _publishEndpoint;


//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }

public MassTransitDistributedEventBus(
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
IPublishEndpoint publishEndpoint)
{
//ServiceScopeFactory = serviceScopeFactory;
_publishEndpoint = publishEndpoint;
DistributedEventBusOptions = distributedEventBusOptions.Value;
//Subscribe(distributedEventBusOptions.Value.Handlers);
}

/*
* Not Implementation
*/



public Task PublishAsync<TEvent>(TEvent eventData)
where TEvent : class
{
return _publishEndpoint.Publish(eventData);
}

public Task PublishAsync(Type eventType, object eventData)
{
return _publishEndpoint.Publish(eventData, eventType);
}
}

到此,我们实现了MassTransit与Abp集成。

事件消息传递User Claims

在实际业务实现过程中,我们会用消息队列实现“削峰填谷”的效果。异步消息队列中传递用户身份信息如何实现呢?

我们先看看abp在WebApi中,如何确定当前用户?

ICurrentUser 提供当前User Claims抽象。而 ICurrentUser 依赖于 ICurrentPrincipalAccessor ,在Asp.Net core中利用HttpContext User 来记录当前用户身份。

在MassTransit中,利用 IPublishObserverIConsumeObserver 生产者/消费端的观察者,来实现传递已认证的用户Claims。

Copy

/// <summary>
/// 生产者传递当前用户Principal
/// </summary>
public class AuthPublishObserver : IPublishObserver
{
private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;
private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;

public AuthPublishObserver(
ICurrentPrincipalAccessor currentPrincipalAccessor,
IClaimsPrincipalFactory claimsPrincipalFactory)
{
_currentPrincipalAccessor = currentPrincipalAccessor;
_claimsPrincipalFactory = claimsPrincipalFactory;
}

public Task PrePublish<T>(PublishContext<T> context) where T : class
{
var claimsPrincipal = _claimsPrincipalFactory
.CreateClaimsPrincipal(
_currentPrincipalAccessor.Principal
);

if (claimsPrincipal != null)
{
context.Headers.SetAuthenticationHeaders(claimsPrincipal);
}


return TaskUtil.Completed;

}
public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;

}

Copy


/// <summary>
/// 消费端从MqMessage Heads 中获取当前用户Principal,并赋值给HttpContext
/// </summary>
public class AuthConsumeObserver : IConsumeObserver
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IServiceScopeFactory _factory;


public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory)
{
_httpContextAccessor = httpContextAccessor;
_factory = factory;
}

public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
if (_httpContextAccessor.HttpContext == null)
{
_httpContextAccessor.HttpContext = new DefaultHttpContext
{
RequestServices = _factory.CreateScope().ServiceProvider
};
}

var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();

if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated)
{
var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();

_httpContextAccessor.HttpContext.User = claimsPrincipal;
Thread.CurrentPrincipal = claimsPrincipal;
}

return TaskUtil.Completed;
}

public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
_httpContextAccessor.HttpContext = null;

return TaskUtil.Completed;
}

public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
_httpContextAccessor.HttpContext = null;

return TaskUtil.Completed;
}
}

使用Asp.Net Core Web Host 作消费端Consumer宿主

基于以下几点原因,我们使用Asp.Net Core Web Host 作为消息端Consumer宿主

  1. 部署在 Linux 环境下,Asp.Net Core Web Host 通常使用守护进程来启动服务实例,这样可以保证服务不被中断。

  2. 根据abp vnext DDD 项目分层,最大程度利用Application层应用方法,复用abp vnext 框架机制。

MassTransit 深入研究

  1. 延迟消息

  2. 限流、熔断降级

  3. 批量消费

  4. Saga

References

  1. abp vnext disctributed event bus

  2. MassTransit

基于Abp VNext框架设计 - Masstransit分布式消息


以上所述就是小编给大家介绍的《基于Abp VNext框架设计 - Masstransit分布式消息》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

世界因你不同:李开复自传(纪念版)

世界因你不同:李开复自传(纪念版)

李开复,范海涛 著作 / 中信出版社 / 2015-7-10 / 39.00

编辑推荐 1.李开复唯一一部描写全面生平事迹的传记:《世界因你不同:李开复自传》书中讲述了家庭教育培育的“天才少年”;学校教育塑造的“创新青年”,走入世界顶级大公司,苹果、微软、谷歌等亲历的风云内幕,岁月30载不懈奋斗、追求事业成功的辉煌历程。 2.娓娓道来、字字珠玑、可读性和故事性皆佳。李开复博士是青少年成长成才的励志偶像,年轻家长、学校教师阅读后也能从中得到感悟和启发。 3.......一起来看看 《世界因你不同:李开复自传(纪念版)》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具