Tip: 此篇已加入.NET Core微服务基础系列文章索引
一、案例结构与说明
在上一篇中,我们了解了MassTransit这个开源组件的基本用法,这一篇我们结合一个小案例来了解在ASP.NET Core中如何借助MassTransit+Quartz.Net来实现数据的最终一致性。当然,实现数据的最终一致性有很多方案,这里只是举一种我所学到的比较简单易于学习的实现方式而已。
假设我们有一个XX保险微信商城(WechatShop,简称WS)系统,根据服务的划分,针对下订单的这个场景,我们划分了四个微服务,他们分别是订单服务(OrderService),库存服务(StorageService)、配送服务(DeliveryService)以及一个只用于后台监控事件消息的事件后台服务(EventService)。
我们来看看这个场景下的一些基本业务逻辑流程(可能并不一定正确或者覆盖全面):
(1)当终端User在微信商城中看中一个保单产品,并且下单之后,订单服务会接收订单信息并更新订单数据库(这里假设只更新一张订单表)。
(2)然后事件后台服务会定期(比如每隔30秒)检查订单数据库的事件状态表(比如一张单独的Events表,里面有某个订单的2行记录,每行记录分别代表与订单相关的服务(这里就是库存和配送服务)的处理状态),如果发现相关服务的事件状态为未处理,则会向事件总线(假设这里基于RabbitMQ)发送消息告知对应订阅者要处理这个订单。
(3)这里对应的订阅者就是库存服务和配送服务,他们收到消息之后,会进行各自的业务逻辑处理。比如,库存服务会根据订单信息去更新库存数据库并做一些逻辑处理比如更新保单促销活动的记录,配送服务会根据订单信息更新配送数据库并做一些逻辑处理比如打印纸质保单并进行物流预约登记,当他们各自处理完成之后便会向事件总线发送一个处理完毕的消息。
(4)事件后台服务也会作为订阅者,接收库存和配送服务发送过来的消息,如果接收到某个服务的处理完毕消息,便会根据接收到的消息去更新前面事件状态表中的对应的事件记录记录行。比如:接收到库存服务发送的消息,会更新时间状态表中这个OrderID相关的库存事件状态的那一行记录的状态为已处理。
(5)事件后台服务的定时任务中(这里假设每隔30秒一次),会Check事件是否还有未处理完毕的事件消息,如果没有则休眠,否则会检查其创建记录的时间与现在的系统时间的间隔是否超过了最大容忍值(这里假设1小时),如果没有超过则继续向事件总线发送消息,如果超过了则进行一些事务回滚逆操作和向管理员发送一些警告信息以便进行人工干预等操作。
Talk is cheap, Show me the code。下面我们来看看如何实现,由于篇幅原因可能只会列出关键性代码,详细代码请自行去GitHub上下载或Clone。
下面是这次实验的项目结构,需要准备如下五个项目(四个ASP.NET Core WebAPI和一个.NET Core类库)
数据库这里实验采用的是MSSQL,只创建了一个Order数据库,两张表的数据如下:
可以看到,在Events表的设计中,通过EventType来区分事件类型,这里是订单创建(CreateOrder)这个事件的两个具体消息(StorageService和DeliveryService),状态(StatusValue)为1代表未处理,为2则代表已处理。
二、OrderService的实现
2.1 准备工作
其中,Controllers中主要用于与终端用户(比如WebForm、MVC、SPA等)交互,Models下主要用于定义DTO、EF DbContext与Entity,Repositories目录下主要用于定义Repository与数据库进行交互。
2.2 具体实现
(1)OrderController
[Route("api/Order")]
public class OrderController : Controller
{
public IOrderRepository OrderRepository { get; } public OrderController(IOrderRepository OrderRepository)
{
this.OrderRepository = OrderRepository;
} [HttpPost]
public string Post([FromBody]OrderDTO orderDTO)
{
var result = OrderRepository.CreateOrder(orderDTO).GetAwaiter().GetResult(); return result ? "Post Success" : "Post Failed";
}
}
这里就是简单的调用OrderRepository进行订单的创建操作。
(2)OrderRepository
public class OrderRepository : IOrderRepository
{
public OrderDbContext OrderContext { get; } public OrderRepository(OrderDbContext OrderContext)
{
this.OrderContext = OrderContext;
} public async Task<bool> CreateOrder(IOrder order)
{
var orderEntity = new Order()
{
ID = GenerateOrderID(),
OrderUserID = order.OrderUserID,
OrderTime = order.OrderTime,
OrderItems = null,
}; OrderContext.Orders.Add(orderEntity); // patch data
order.ID = orderEntity.ID;
order.StatusKey = EventConstants.EVENT_STATUS_KEY_STORAGE; var eventEntityA = new Event()
{
ID = GenerateEventID(),
OrderID = orderEntity.ID,
Type = EventConstants.EVENT_TYPE_CREATE_ORDER,
StatusKey = EventConstants.EVENT_STATUS_KEY_STORAGE,
StatusValue = EventStatusEnum.UNHANDLE,
CreatedTime = DateTime.Now,
EntityJson = JsonHelper.SerializeObject(order)
}; order.StatusKey = EventConstants.EVENT_STATUS_KEY_DELIVERY; var eventEntityB = new Event()
{
ID = GenerateEventID(),
OrderID = orderEntity.ID,
Type = EventConstants.EVENT_TYPE_CREATE_ORDER,
StatusKey = EventConstants.EVENT_STATUS_KEY_DELIVERY,
StatusValue = EventStatusEnum.UNHANDLE,
CreatedTime = DateTime.Now,
EntityJson = JsonHelper.SerializeObject(order)
}; OrderContext.Events.Add(eventEntityA);
OrderContext.Events.Add(eventEntityB); int count = await OrderContext.SaveChangesAsync(); return count == ;
} private string GenerateOrderID()
{
// TODO: Some business logic to generate Order ID
return Guid.NewGuid().ToString();
} private string GenerateEventID()
{
// TODO: Some business logic to generate Order ID
return Guid.NewGuid().ToString();
}
}
这里主要是通过EF Core向订单数据库中的Orders表和Events表添加数据。可以看到,这里向Events表中添加了两个记录,分别通过StatusKey进行区分。这里的StatusKey其实是一个冗余字段,只是为了后面在不同的服务之间区分是否是自己需要处理的消息。
三、StorageService与DeliveryService的实现
3.1 StorageService的实现
(1)通过NuGet安装MassTransit、MassTransit.RabbitMQ、MassTransit.Extensions.DependencyInjection
(2)在StartUp类中注入MassTransit的IBusControl实例,加入了熔断、重试与限流,具体看注释
public class Startup
{
...... // This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
...... // EFCore
services.AddDbContextPool<OrderDbContext>(
options => options.UseSqlServer(Configuration["DB:OrderDB"])); // Repository
services.AddScoped<IStorageRepository, StorageRepsitory>();
services.AddScoped<StoreageOrderEventHandler>(); // MassTransit
services.AddMassTransit(c =>
{
c.AddConsumer<StoreageOrderEventHandler>();
});
} public static IBusControl BusControl { get; private set; } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime)
{
...... // Register Bus
BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(Configuration["MQ:Host"]), hst =>
{
hst.Username(Configuration["MQ:UserName"]);
hst.Password(Configuration["MQ:Password"]);
}); cfg.ReceiveEndpoint(host, Configuration["MQ:Queues:Storage"], e =>
{
// Retry
e.UseRetry(ret =>
{
ret.Interval(, TimeSpan.FromSeconds()); // 每隔10s重试一次,最多重试3次
}); // RateLimit
e.UseRateLimit(, TimeSpan.FromSeconds()); // 100s内限1000次访问请求 // CircuitBreaker
e.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(); // 跟踪周期:1min
cb.TripThreshold = ; // 失败比例达到15%后才会打开熔断器
cb.ActiveThreshold = ; // 至少发生5次请求后才会打开熔断器
cb.ResetInterval = TimeSpan.FromMinutes(); // 熔断时间间隔:5mins
}); e.LoadFrom(serviceProvider);
});
}); // Register Start & Stop for Bus
lifetime.ApplicationStarted.Register(BusControl.Start);
lifetime.ApplicationStarted.Register(BusControl.Stop);
}
}
这里需要注意的就是,在注入时我们指定了接收消息所要进行处理的类对象:StorageOrderEventHandler。
(3)StorageOrderEventHandler
public class StoreageOrderEventHandler : IConsumer<IOrder>
{
public IStorageRepository StorageRepository { get; }
public IBusControl EventBus { get; } public StoreageOrderEventHandler(IStorageRepository StorageRepository)
{
this.StorageRepository = StorageRepository;
this.EventBus = Startup.BusControl;
} public async Task Consume(ConsumeContext<IOrder> context)
{
var order = context.Message;
if (order.StatusKey != EventConstants.EVENT_STATUS_KEY_STORAGE)
{
// 如果不是StorageService要处理的Event则忽略该消息
return;
} var result = StorageRepository.CreateStorage(order).GetAwaiter().GetResult();
if (result)
{
IOrderEventEntity orderEventEntity = new OrderEventEntity
{
OrderID = order.ID,
EventType = EventConstants.EVENT_TYPE_CREATE_ORDER,
StatusKey = EventConstants.EVENT_STATUS_KEY_STORAGE,
StatusValue = EventStatusEnum.HANDLED
}; await EventBus.Publish(orderEventEntity);
}
}
这里首先判断接收到的消息是否是需要自己处理的,不是则忽略,是则调用StorageRepository进行库存记录的更新及其他业务逻辑(与DB有关的)操作,处理完毕后向事件总线发送一个消息,这里主要是告知哪个OrderID的哪个事件类型(EventType)的哪个具体服务(StatusKey)要更新到哪个状态(StatusValue)。
(4)StorageRepository:假装自己是一个真实的仓储
public class StorageRepsitory : IStorageRepository
{
public OrderDbContext OrderContext{ get; } public StorageRepsitory(OrderDbContext OrderContext)
{
this.OrderContext = OrderContext;
} public async Task<bool> CreateStorage(IOrder order)
{
var eventList = await OrderContext.Events.Where(e => e.OrderID == order.ID
&& e.Type == EventConstants.EVENT_TYPE_CREATE_ORDER
&& e.StatusKey == EventConstants.EVENT_STATUS_KEY_STORAGE).ToListAsync(); if (eventList == null)
{
return false;
} foreach (var eventItem in eventList)
{
try
{
// TODO : Add record to Storage DB
Console.WriteLine($"Add one record to Storage DB : { JsonHelper.SerializeObject(order) }");
}
catch (Exception ex)
{
// TODO : Exception log
Console.WriteLine($"Exception: {ex.Message}");
return false;
}
} return true;
}
}
这里假设会做一些业务处理,比如向库存数据库添加一条记录等。由于时间和精力,这里我只向控制台输出一条消息已进行验证。
3.2 DeliveryService的实现
与StorageService高度类似,篇幅关系,不再赘述,请自行浏览示例源代码。这里之展示一下DeliveryOrderEventHandler类:
public class DeliveryOrderEventHandler : IConsumer<IOrder>
{
public IDeliveryRepository DeliveryRepository { get; }
public IBusControl EventBus { get; } public DeliveryOrderEventHandler(IDeliveryRepository StorageRepository)
{
this.DeliveryRepository = StorageRepository;
this.EventBus = Startup.BusControl;
} public async Task Consume(ConsumeContext<IOrder> context)
{
var order = context.Message;
if (order.StatusKey != EventConstants.EVENT_STATUS_KEY_DELIVERY)
{
// 如果不是DeliveryService要处理的Event则忽略该消息
return;
} var result = DeliveryRepository.CreateDelivery(order).GetAwaiter().GetResult();
if (result)
{
IOrderEventEntity orderEventEntity = new OrderEventEntity
{
OrderID = order.ID,
EventType = EventConstants.EVENT_TYPE_CREATE_ORDER,
StatusKey = EventConstants.EVENT_STATUS_KEY_DELIVERY,
StatusValue = EventStatusEnum.HANDLED
}; await EventBus.Publish(orderEventEntity);
}
}
}
可以看出,这里的业务逻辑和StorageService一样,只是StatusKey不同而已。
四、EventService的实现
4.1 项目结构
在EventService中,除了安装MassTransit相关的package之外,还要安装Quartz.Net的package。
4.2 StartUp类
(1)ConfigureService方法
public void ConfigureServices(IServiceCollection services)
{
//services.AddMvc(); // EFCore
services.AddDbContextPool<OrderDbContext>(
options => options.UseSqlServer(Configuration["DB:OrderDB"])); // Dapper-ConnString
services.AddSingleton(Configuration["DB:OrderDB"]); // Repository
services.AddSingleton<IEventRepository<IOrderEventEntity>, OrderEventDapperRepository>(); // Quartz
services.UseQuartz(typeof(OrderEventJob)); // MassTransit
services.AddMassTransit(c =>
{
c.AddConsumer<OrderEventHandler>();
});
}
这里指定了Quartz.Net要处理的定时任务(通过自定义的扩展方法)以及MassTransit要处理的事件处理程序。有关Quartz.Net的内容不在本篇的重点,下面看看OrderEventHandler类,它主要就是根据收到的消息去更新某个类别某个项的事件状态记录。
public class OrderEventHandler : IConsumer<IOrderEventEntity>
{
public IEventRepository<IOrderEventEntity> EventRepository { get; } public OrderEventHandler(IEventRepository<IOrderEventEntity> EventRepository)
{
this.EventRepository = EventRepository;
} public async Task Consume(ConsumeContext<IOrderEventEntity> context)
{
var eventResult = await EventRepository.UpdateEventStatus(EventConstants.EVENT_TYPE_CREATE_ORDER, context.Message);
}
}
在EventRepository具体类中,这里主要是通过Dapper去操作数据库:
public class OrderEventDapperRepository : IEventRepository<IOrderEventEntity>
{
private string connStr; public OrderEventDapperRepository(string connStr)
{
this.connStr = connStr;
} public async Task<IEnumerable<IOrderEventEntity>> GetEvents(string eventType)
{
using (var conn = new SqlConnection(connStr))
{
string sqlCommand = @"SELECT [EventID],
[EventType],
[OrderID],
[CreatedTime],
[StatusKey],
[StatusValue],
[EntityJson]
FROM [dbo].[Events]
WHERE EventType = @EventType
AND StatusValue = @StatusValue"; var result = await conn.QueryAsync<OrderEventEntity>(sqlCommand, param: new
{
EventType = EventConstants.EVENT_TYPE_CREATE_ORDER,
StatusValue = EventStatusEnum.UNHANDLE
}); return result;
}
} public async Task<bool> UpdateEventStatus(string eventType, IOrderEventEntity orderEvent)
{
using (var conn = new SqlConnection(connStr))
{
string sqlCommand = @"UPDATE [dbo].[Events]
SET StatusValue = @StatusValue
WHERE OrderID = @OrderID
AND EventType = @EventType
AND StatusKey = @StatusKey"; var result = await conn.ExecuteAsync(sqlCommand, param: new
{
OrderID = orderEvent.OrderID,
EventType = EventConstants.EVENT_TYPE_CREATE_ORDER,
StatusKey = orderEvent.StatusKey,
StatusValue = EventStatusEnum.HANDLED
}); return result > ;
}
}
}
*Problem: 在coding过程中,本来想用EF的,结果发现DbContext默认注入的周期是Scoped,而我们的定时Job又是Singleton的,无法正常使用,所以就改用了Dapper。在这个类中,未完成的方法是进行事务回滚逆操作的一系列方法。
(2)Configure方法
public static IBusControl BusControl { get; private set; } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime, IScheduler scheduler)
{
//app.UseMvc(); // Register EventBus
BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(Configuration["MQ:Host"]), hst =>
{
hst.Username(Configuration["MQ:UserName"]);
hst.Password(Configuration["MQ:Password"]);
}); // Retry
cfg.UseRetry(ret =>
{
ret.Interval(, TimeSpan.FromSeconds()); // 每隔10s重试一次,最多重试3次
}); // RateLimit
cfg.UseRateLimit(, TimeSpan.FromSeconds()); // 100s内限1000次访问请求 // CircuitBreaker
cfg.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(); // 跟踪周期:1min
cb.TripThreshold = ; // 失败比例达到15%后才会打开熔断器
cb.ActiveThreshold = ; // 至少发生5次请求后才会打开熔断器
cb.ResetInterval = TimeSpan.FromMinutes(); // 熔断时间间隔:5mins
}); cfg.ReceiveEndpoint(host, Configuration["MQ:Queues:Order"], e =>
{
e.LoadFrom(serviceProvider);
});
}); // Register Start & Stop for Bus
lifetime.ApplicationStarted.Register(BusControl.Start);
lifetime.ApplicationStarted.Register(BusControl.Stop); // Scheduler Job
QuartzServiceUtil.StartJob<OrderEventJob>(scheduler, TimeSpan.FromSeconds());
}
由于无法在ConfigureService方法中正常的注入MassTransit,所以采用了一个折中的方式:静态变量,还好我们只需要一个单例的BusControl即可。这里我们在启动时,开启了一个定时任务,这个定时任务的逻辑如下,它每隔30s执行一次。
public class OrderEventJob: IJob
{
public IEventRepository<IOrderEventEntity> OrderEventRepository { get; }
public IBusControl EventBus { get; }
private int MaxHours; public OrderEventJob(IEventRepository<IOrderEventEntity> OrderEventRepository, IConfiguration configuration)
{
this.OrderEventRepository = OrderEventRepository;
this.EventBus = Startup.BusControl;
this.MaxHours = Convert.ToInt32(configuration["Service:MaxHours"]);
} public async Task Execute(IJobExecutionContext context)
{
var events = OrderEventRepository.GetEvents(EventConstants.EVENT_TYPE_CREATE_ORDER).GetAwaiter().GetResult(); if (events == null)
{
await Console.Out.WriteLineAsync($"[Tip] There's no pending to process Order events.");
return;
} foreach (var eventItem in events)
{
if (GetDifferenceInHours(eventItem.CreatedTime) >= MaxHours)
{
// TODO:
// 1.Rollback previous transaction by send rollback message
// 2.Send Email/Messages to administrator
// ...... break;
} IOrder order = JsonHelper.DeserializeJsonToObject<Order>(eventItem.EntityJson);
await EventBus.Publish(order);
}
} private double GetDifferenceInHours(DateTime createdTime)
{
DateTime currentTime = DateTime.Now;
TimeSpan ts = currentTime.Subtract(createdTime);
double differenceInDays = ts.TotalHours; return differenceInDays;
}
}
这里的MaxHours(最大容忍小时)在配置文件中设置的是1,即1小时。在每个定时任务中,系统会去首先check未处理的事件消息的创建时间和现在系统时间的间隔时间是否超过了1小时,超过了则会进行一系列的回滚逆操作和发送邮件/短信等操作告知人工干预,这一部分由于时间和精力未实现,有兴趣的可以自己去实现。如果没超过,则会将事件状态表记录行中的EntityJson(这里主要是订单表的序列化后的JSON字符串)反序列化并作为消息进行发送给事件总线从而通知订阅者。
五、快速测试
5.1 向OrderService发送一个订单请求
首先,清空测试的订单数据库表,此时无一条记录。
然后,通过PostMan工具向OrderService发送一条订单请求(前提是你得同时把这四个服务一起启动起来):
5.2 Check此时的订单数据库
此时已经有了对应的数据,可以看到DeliveryService和StorageService的两个事件状态记录行的状态是1(代表待处理)
5.3 一定时间后的订单数据库
经过两个服务的处理之后,状态变为了2(代表已处理),再看看两个服务的控制台信息,分别在处理事件消息时输出了一行记录:
在标准情况下,当所有相关的事件消息状态都变成已处理时,这时数据就达到了最终一致性。当然,还有一些重试的补偿和事务的回滚逆操作,没有做演示,有兴趣可以自行研究。
六、小结
本篇主要基于一个小案例(订单业务处理场景),首先介绍了其业务场景与基本业务流程,然后通过介绍相关的每个服务的代码实现,最后通过一个快速的测试演示了数据如何达到最终一致性。当然,这个小案例并不完整,没有对重试的补偿机制以及失败后的回滚机制进行演示和测试,不过有兴趣的朋友可以自行改代码实现。最后,再次强调实现数据的最终一致性有很多方案,这里只是举一种我从桂素伟老师那里所学到的比较简单易于学习的实现方式而已。对MassTransit感兴趣想应用于生产环境的朋友,可以去了解了解saga,建议看看这篇文章:《MassTransit&Sage分布式服务开发PPT分享》
示例代码
Click Here => 点我下载
参考资料
(1)桂素伟,《基于.NET Core的微服务架构》
(2)richieyangs(张阳),《如何优雅的使用RabbitMQ》,《使用Masstransit开发基于消息传递的分布式应用》
(3)青客宝团队,《MassTransit&Sagas分布式服务开发ppt分享》
(4)成天,《MassTransit实现应用程序间的交互》
(5)娃娃都会打酱油了,《MassTransit学习记录》
(6)MassTransit 官方文档,http://masstransit-project.com/MassTransit/