设计目标
- 尽量快的处理命令和事件,保证吞吐量;
- 处理完一个命令后不需要等待命令产生的事件持久化完成就能处理下一个命令,从而保证领域内的业务逻辑处理不依赖于持久化IO,实现真正的in-memory;
- 保证命令、事件处理的顺序性,先来的先处理,先产生的先处理;
- 保证一个聚合根的事件只有一个线程在持久化,并按事件产生的顺序持久化;
- 持久化事件时如果遇到并发冲突时(聚合根ID+事件版本号出现重复)的处理代价要轻;
- 要能利用多核的优势;
总体设计思路
- 先将命令根据聚合根ID路由到CommandMailBox里;
- 单线程处理CommandMailBox中的命令,由于聚合根在in-memory本地内存,所以处理非常快;
- 处理成功后更新聚合根的in-memory内存;
- 内存更新后将聚合根产生的事件同样原理路由到EventMailBox里;
- 单线程批量处理EventMailBox中的事件;由于是批量,所以持久化的吞吐量也可以保证;
- 处理完成一批事件后,把这一批事件对应的命令从CommandMailBox中移除;
详细设计思路
- 设计N个存放命令的CommandMailBox,命令首先按聚合根ID的hashcode取摸路由到对应的CommandMailBox;
- 每个CommandMailBox都有一个maxOffset, consumeOffset,以及一个CommandProcessor(单线程)在不停的处理;maxOffset表示最后一个命令的位置;consumeOffset表示当前正在处理的命令的位置;
- CommandProcessor的处理逻辑;
- 创建、修改聚合根;
- 更新聚合根的in-memory缓存;
- 将聚合根产生的事件按聚合根ID的hashcode取摸路由到对应的EventMailBox;EventMailBox的个数也是程序启动时配置;
- 每个EventMailBox都有一个maxOffset, consumeOffset,以及一个EventProcessor(单线程)在不停的处理;maxOffset表示最后一个事件的位置;consumeOffset表示当前正在处理的事件的位置;
- EventProcessor的处理逻辑:
- 按次序批量获取一批要处理的事件;
- 批量持久化事件到EventStore,采用SqlBulkCopy;
- 如果持久化一切顺利,则publish这一批事件(publish如果遇到网络IO异常,则重试,直到成功为止),然后继续持久化下一批,并同时将当前这一批事件对应的命令从CommandMailBox中删除;.如果持久化遇到并发冲突(事件的aggregateRootId+Version重复),则对当前这一批事件一个个按顺序持久化。如果当前事件持久化成功,则同样publish该事件,当然遇到IO异常时也要不断重试,直到成功为止;成功后通知CommandMailBox移除当前事件对应的命令;如果当前事件持久化出现并发冲突,就做如下处理:
- 先通知当前事件对应聚合根暂停处理后续的命令;
- 用Event Sourcing技术将in-memory中的聚合根的状态还原到最新状态,确保下次执行command时基于的聚合根的状态是最新的;
- 把这一批里该聚合根的所有事件移除,把EventMailBox中的该聚合根的所有事件移除;
- 将CommandMailBox的处理位置重置为当前事件对应的命令的offset;从而可以确保产生并发冲突的事件对应的命令以及后续的命令能再重新被处理一遍;
- 通知当前事件对应聚合根继续处理后续的命令(从哪个位置开始处理,在上面第4步已经重置过了);
- 这一批的所有事件都一个个处理完之后,按同样的逻辑继续处理下一批事件;
其他说明
- 上面的设计基于一个前提,就是一个聚合根几乎不会同时在两台服务器上同时存在并处理命令,否则就会出现并发冲突,而并发冲突的处理的代价还是比较复杂的,应该尽量避免;这点可以通过EQueue保证;
- 当聚合根处理了命令,尝试更新in-memory内存时,可能有一种情况会失败。就是如果这个命令是创建聚合根的,而有可能并发的时候这个聚合根可能在内存中已经有了,则创建完聚合根添加到内存时,应该能检测出来并记录错误日志,然后该命令产生的事件也不必放入EventMailBox,然后认为该命令处理成功即可。
- 上面的设计中没有谈到当遇到命令重复执行时的设计思路,大家可以自己想想:)