前言
简单介绍一下EventBus.
正文
EventBus 也就是集成事件,用于服务与服务之间的通信。
比如说我们的订单处理事件,当订单处理完毕后,我们如果通过api马上去调用后续接口。
比如说订单完成给用户通知的话,如果是大量订单,即使我们使用异步async await 这种模式,在这个订单服务中将会大量占用资源,因为async await 本身是线程池。
因为里面的资源是有限的,如果创建订单还有完成订单占用大量资源的话,发送邮件还加入到竞争中,那么可以想象,机器掉入资源用尽就更大了。如果是订单完成之后要调用多个服务,那么可想而知,压力多大。
那么EventBus 就是通过发布订阅这种模式来缓存起来,EventBus 发布一个事件,然后其他微服务可以进行订阅,这样就缓解了机器的负担。
.net core 实现EventBus 的框架如下:https://github.com/dotnetcore/CAP
可以去看下上面这个地址,这里就不详细介绍了,因为没有什么比文档更加详细了。
在.net core 中加入EventBus:
// 注册 EventBus
services.AddEventBus(Configuration);
AddEventBus 如下:
/// <summary>
/// 注册EventBus(集成事件处理服务)
/// </summary>
/// <param name="services"></param>
/// <param name="configuration"></param>
/// <returns></returns>
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
// 注入集成事件订阅服务
services.AddTransient<ISubscriberService, SubscriberService>();
// 注入CAP服务
services.AddCap(options =>
{
// 指定CAP组件所使用的数据库上下文,当前设置表示EventBus与领域驱动共享数据库链接
options.UseEntityFramework<DomainContext>();
// package: DotNetCore.CAP.RabbitMQ
// 指定RabbitMQ作为我们EventBus的消息队列的存储,并注入配置
options.UseRabbitMQ(options =>
{
configuration.GetSection("RabbitMQ").Bind(options);
});
// options.UseDashboard();
});
return services;
}
可以看到上面使用RabbitMQ作为Eventbus的消息队列处理。当然除了RabbitMq,这个框架还支持其他的,如Kafka, AzureService, AmazonSQS等,看公司或者项目需要什么就行。
因为使用了RabbitMq,那么上面的写了加载RabbitMq:configuration.GetSection("RabbitMQ").Bind(options),配置RabbitMQ:
"RabbitMQ": {
"HostName": "127.0.0.1",
"UserName": "mq",
"Password": "123456",
"VirtualHost": "blog",
"ExchangeName": "blog_queue"
}
那么看一下EventBus发布:
/// <summary>
/// 创建Order领域事件处理
/// </summary>
public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
{
ICapPublisher _capPublisher;
public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
{
_capPublisher = capPublisher;
}
/// <summary>
/// 领域事件处理
/// </summary>
/// <param name="notification"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
{
// 当创建新订单时,向 EventBus 发布一个事件
await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
}
}
前面订单创建完毕的时候,经过调用相关的领域事件,然后向EventBus 发送了一个事件。那么先看下这个事件。
/// <summary>
/// 集成事件:订单创建完成
/// </summary>
public class OrderCreatedIntegrationEvent
{
public long OrderId { get; }
public OrderCreatedIntegrationEvent(long orderId)
{
OrderId = orderId;
}
}
这样就创建了一个集成事件,并且发布出去了。
然后创建订阅服务:
/// <summary>
/// 集成事件订阅服务
/// </summary>
public class SubscriberService : ISubscriberService, ICapSubscribe
{
IMediator _mediator;
public SubscriberService(IMediator mediator)
{
_mediator = mediator;
}
/// <summary>
/// 订阅订单创建成功集成事件
/// </summary>
/// <param name="event"></param>
[CapSubscribe("OrderCreated")]
public void OrderCreatedSucceeded(OrderCreatedIntegrationEvent @event)
{
// do something...
}
/// <summary>
/// 订阅订单支付成功集成事件
/// </summary>
/// <param name="event"></param>
[CapSubscribe("OrderPaymentSucceeded")]
public void OrderPaymentSucceeded(OrderPaymentSucceededIntegrationEvent @event)
{
// do something...
}
}
这样即可,框架会帮我们处理。
总结
-
集成事件是跨服务的事件,在领域模式中,集成事件也是跨服务的领域事件
-
在领域模型中,集成事件一般由领域事件驱动触发
-
集成事件可以实现一致性,补充事务不能跨服务
-
集成事件会有一些额外的负担,使用的时候思考是否有必要
结
因为这个文档比较清晰,直接看文档更加通常,就不演示具体运行。
再次放一下文档:https://github.com/dotnetcore/CAP
下一节HttpClientFactory,以上只是个人整理一下,如有错误,望请指出。