重新整理 .net core 实践篇—————微服务的桥梁EventBus[三十一]

前言

简单介绍一下EventBus.

正文

EventBus 也就是集成事件,用于服务与服务之间的通信。

比如说我们的订单处理事件,当订单处理完毕后,我们如果通过api马上去调用后续接口。

比如说订单完成给用户通知的话,如果是大量订单,即使我们使用异步async await 这种模式,在这个订单服务中将会大量占用资源,因为async await 本身是线程池。

因为里面的资源是有限的,如果创建订单还有完成订单占用大量资源的话,发送邮件还加入到竞争中,那么可以想象,机器掉入资源用尽就更大了。如果是订单完成之后要调用多个服务,那么可想而知,压力多大。

那么EventBus 就是通过发布订阅这种模式来缓存起来,EventBus 发布一个事件,然后其他微服务可以进行订阅,这样就缓解了机器的负担。

.net core 实现EventBus 的框架如下:https://github.com/dotnetcore/CAP

可以去看下上面这个地址,这里就不详细介绍了,因为没有什么比文档更加详细了。

在.net core 中加入EventBus:

// 注册 EventBus  
services.AddEventBus(Configuration);

AddEventBus 如下:

/// <summary>
/// 注册EventBus(集成事件处理服务)
/// </summary>
/// <param name="services"></param>
/// <param name="configuration"></param>
/// <returns></returns>
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
	// 注入集成事件订阅服务
	services.AddTransient<ISubscriberService, SubscriberService>();

	// 注入CAP服务
	services.AddCap(options =>
	{
		// 指定CAP组件所使用的数据库上下文,当前设置表示EventBus与领域驱动共享数据库链接
		options.UseEntityFramework<DomainContext>();

		// package: DotNetCore.CAP.RabbitMQ
		// 指定RabbitMQ作为我们EventBus的消息队列的存储,并注入配置
		options.UseRabbitMQ(options =>
		{
			configuration.GetSection("RabbitMQ").Bind(options);
		});
		// options.UseDashboard();
	});
	return services;
}

可以看到上面使用RabbitMQ作为Eventbus的消息队列处理。当然除了RabbitMq,这个框架还支持其他的,如Kafka, AzureService, AmazonSQS等,看公司或者项目需要什么就行。

因为使用了RabbitMq,那么上面的写了加载RabbitMq:configuration.GetSection("RabbitMQ").Bind(options),配置RabbitMQ:

"RabbitMQ": {
"HostName": "127.0.0.1",
"UserName": "mq",
"Password": "123456",
"VirtualHost": "blog",
"ExchangeName": "blog_queue"
}

那么看一下EventBus发布:

/// <summary>
/// 创建Order领域事件处理
/// </summary>
public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
{
	ICapPublisher _capPublisher;
	public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
	{
		_capPublisher = capPublisher;
	}

	/// <summary>
	/// 领域事件处理
	/// </summary>
	/// <param name="notification"></param>
	/// <param name="cancellationToken"></param>
	/// <returns></returns>
	public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
	{
		// 当创建新订单时,向 EventBus 发布一个事件
		await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
	}
}

前面订单创建完毕的时候,经过调用相关的领域事件,然后向EventBus 发送了一个事件。那么先看下这个事件。

/// <summary>
/// 集成事件:订单创建完成
/// </summary>
public class OrderCreatedIntegrationEvent
{
	public long OrderId { get; }
	public OrderCreatedIntegrationEvent(long orderId)
	{
		OrderId = orderId;
	}
}

这样就创建了一个集成事件,并且发布出去了。

然后创建订阅服务:

/// <summary>
/// 集成事件订阅服务
/// </summary>
public class SubscriberService : ISubscriberService, ICapSubscribe
{
	IMediator _mediator;
	public SubscriberService(IMediator mediator)
	{
		_mediator = mediator;
	}

	/// <summary>
	/// 订阅订单创建成功集成事件
	/// </summary>
	/// <param name="event"></param>
	[CapSubscribe("OrderCreated")]
	public void OrderCreatedSucceeded(OrderCreatedIntegrationEvent @event)
	{
		// do something...
	}

	/// <summary>
	/// 订阅订单支付成功集成事件
	/// </summary>
	/// <param name="event"></param>
	[CapSubscribe("OrderPaymentSucceeded")]
	public void OrderPaymentSucceeded(OrderPaymentSucceededIntegrationEvent @event)
	{
		// do something...
	}
}

这样即可,框架会帮我们处理。

总结

  1. 集成事件是跨服务的事件,在领域模式中,集成事件也是跨服务的领域事件

  2. 在领域模型中,集成事件一般由领域事件驱动触发

  3. 集成事件可以实现一致性,补充事务不能跨服务

  4. 集成事件会有一些额外的负担,使用的时候思考是否有必要

因为这个文档比较清晰,直接看文档更加通常,就不演示具体运行。

再次放一下文档:https://github.com/dotnetcore/CAP

下一节HttpClientFactory,以上只是个人整理一下,如有错误,望请指出。

上一篇:vue 组件间 8 大通讯方式 之三 eventBus


下一篇:窗口滚动顶部栏隐藏