内容简介:abp 通过abp 默认实现基于RabbitMq消息队列
abp 通过 IDistributedEventBus
接口集成自IEventBus实现分布式事件消息的发布订阅。
IEventBus
在什么时机触发 PublishAsync
?
-
当前UnitOfWork完成时,触发
IEventBus
的PublishAsync
-
在没有事务环境下,同步调用
IEventBus
的PublishAsync
abp 默认实现基于RabbitMq消息队列 Volo.Abp.EventBus.RabbitMQ
实现分布式消息的发布与订阅。
消息治理核心问题:
-
生产端如何保证投递成功的消息不能丢失。
-
Mq自身如何保证消息不丢失。
-
消费段如何保证消费端的消息不丢失。
基于abp 默认实现的DistributedEventBus不能满足以下场景:
-
Publisher 生产者无法保证消息一定能投递到MQ。
-
Consumer 消费端在消息消费时,出现异常时,没有异常错误处理机制(确保消费失败的消息能重新被消费)。
我们引入Masstransit,来提升abp对消息治理能力。
Masstransit提供以下开箱即用功能:
-
Publish/Send/Request-Response等几种消息投递机制。
-
多种IOC容器支持。
-
异常机制。
-
Saga事务管理。
-
事务活动补偿机制(Courier)
-
消息审计
-
消息管道处理机制
Abp 框架下事件消息集成
-
使用MassTransit重新实现
IDistributedEventBus
。 -
在消费端Consumer传递用户身份信息。
-
使用Asp.Net Core Web Host 作消费端Consumer宿主。
集成MassTransit
在Module初始化时,注入MassTransit实例,并启动。
Copy
/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{
var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();
var mqConnectionString = "rabbitmq://" + options.ConnectionString;
context.Services.AddMassTransit(mtConfig =>
{
//inject consumers into IOC from assembly
mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));
mtConfig.AddBus(provider =>
{
var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
{
var host = mqConfig.Host(new Uri(mqConnectionString), h =>
{
h.Username(options.UserName);
h.Password(options.Password);
});
// set special message serializer
mqConfig.UseBsonSerializer();
// integrated existed logger compontent
mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 200;
//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));
q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);
EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);
});
mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 50;
//q.UseRateLimit(100, TimeSpan.FromSeconds(1));
//q.UseConcurrencyLimit(2);
//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));
q.Consumer<UserSyncEventConsumer>(provider);
EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
});
mqConfig.ConfigureEndpoints(provider);
mqConfig.UseAuditingFilter(provider, o =>
{
o.ReplaceAuditing = true;
});
});
// set authtication middleware for user identity
bus.ConnectAuthenticationObservers(provider);
return bus;
});
});
}
在MassTransit中,使用 IBusControl
接口 StartAsync
或 StopAsync
来启动或停止。
使用 IPublishEndpoint
重新实现 IDistributedEventBus
接口,实现与abp分布式事件总线集成。
Copy
public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency
{
private readonly IPublishEndpoint _publishEndpoint;
//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }
public MassTransitDistributedEventBus(
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
IPublishEndpoint publishEndpoint)
{
//ServiceScopeFactory = serviceScopeFactory;
_publishEndpoint = publishEndpoint;
DistributedEventBusOptions = distributedEventBusOptions.Value;
//Subscribe(distributedEventBusOptions.Value.Handlers);
}
/*
* Not Implementation
*/
public Task PublishAsync<TEvent>(TEvent eventData)
where TEvent : class
{
return _publishEndpoint.Publish(eventData);
}
public Task PublishAsync(Type eventType, object eventData)
{
return _publishEndpoint.Publish(eventData, eventType);
}
}
到此,我们实现了MassTransit与Abp集成。
事件消息传递User Claims
在实际业务实现过程中,我们会用消息队列实现“削峰填谷”的效果。异步消息队列中传递用户身份信息如何实现呢?
我们先看看abp在WebApi中,如何确定当前用户?
ICurrentUser
提供当前User Claims抽象。而 ICurrentUser
依赖于 ICurrentPrincipalAccessor
,在Asp.Net core中利用HttpContext User 来记录当前用户身份。
在MassTransit中,利用 IPublishObserver
> IConsumeObserver
生产者/消费端的观察者,来实现传递已认证的用户Claims。
Copy
/// <summary>
/// 生产者传递当前用户Principal
/// </summary>
public class AuthPublishObserver : IPublishObserver
{
private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;
private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;
public AuthPublishObserver(
ICurrentPrincipalAccessor currentPrincipalAccessor,
IClaimsPrincipalFactory claimsPrincipalFactory)
{
_currentPrincipalAccessor = currentPrincipalAccessor;
_claimsPrincipalFactory = claimsPrincipalFactory;
}
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
var claimsPrincipal = _claimsPrincipalFactory
.CreateClaimsPrincipal(
_currentPrincipalAccessor.Principal
);
if (claimsPrincipal != null)
{
context.Headers.SetAuthenticationHeaders(claimsPrincipal);
}
return TaskUtil.Completed;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;
}
Copy
/// <summary>
/// 消费端从MqMessage Heads 中获取当前用户Principal,并赋值给HttpContext
/// </summary>
public class AuthConsumeObserver : IConsumeObserver
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IServiceScopeFactory _factory;
public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory)
{
_httpContextAccessor = httpContextAccessor;
_factory = factory;
}
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
if (_httpContextAccessor.HttpContext == null)
{
_httpContextAccessor.HttpContext = new DefaultHttpContext
{
RequestServices = _factory.CreateScope().ServiceProvider
};
}
var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();
if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated)
{
var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();
_httpContextAccessor.HttpContext.User = claimsPrincipal;
Thread.CurrentPrincipal = claimsPrincipal;
}
return TaskUtil.Completed;
}
public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
_httpContextAccessor.HttpContext = null;
return TaskUtil.Completed;
}
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
_httpContextAccessor.HttpContext = null;
return TaskUtil.Completed;
}
}
使用Asp.Net Core Web Host 作消费端Consumer宿主
基于以下几点原因,我们使用Asp.Net Core Web Host 作为消息端Consumer宿主
-
部署在 Linux 环境下,Asp.Net Core Web Host 通常使用守护进程来启动服务实例,这样可以保证服务不被中断。
-
根据abp vnext DDD 项目分层,最大程度利用Application层应用方法,复用abp vnext 框架机制。
MassTransit 深入研究
-
延迟消息
-
限流、熔断降级
-
批量消费
-
Saga
References
-
abp vnext disctributed event bus
-
MassTransit
以上所述就是小编给大家介绍的《基于Abp VNext框架设计 - Masstransit分布式消息》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 如何选型一个合适的框架:分布式任务调度框架选型
- 分布式计算框架MapReduce
- 分布式应用框架 Dapr
- 作者访谈:分布式敏捷框架指南
- TensorFlow分布式深度学习框架
- 设计一个分布式RPC框架
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。