ENode 2.0 - 整体架构介绍

前言

今天是个开心的日子,又是周末,可以轻轻松松的写写文章了。去年,我写了ENode 1.0版本,那时我也写了一个分析系列。经过了大半年的时间,我对第一个版本做了很多架构上的改进,最重要的就是让ENode实现了分布式,通过新增一个分布式消息队列EQueue来实现。之所以要设计一个分布式的消息队列是因为在enode 1.0版本中,某个特定的消息队列只能被某个特定的消费者消费。这样就会导致一个问题,就是如果这个消费者挂了,那这个消费者对应的消息队列就不能自动被其他消费者消费了。这个问题会直接导致系统不可用。而ENode 2.0中,就不会有这个问题了,因为消息队列被设计为独立的,被消费者所共享的;一个消息队列可以被多个消费者集群消费或广播消费,如果一个消费者挂了,那其他的消费者会自动顶上。这里具体的细节,我会在后面详细介绍。

ENode框架简介

  1. 框架名称:ENode
  2. 框架特色:DDD+CQRS + EDA + Event Sourcing + In Memory
  3. 设计目标:让程序员只关注业务代码、高性能、分布式、可水平扩展
  4. 开源地址:https://github.com/tangxuehua/enode
  5. 基于enode实现的一个完成案例,一个论坛:https://github.com/tangxuehua/forum
  6. nuget包Id:ENode
  7. 一个独立的分布式消息队列EQueue,可以为ENode提供Command,Domain Event的发布和订阅:https://github.com/tangxuehua/equeue

ENode架构图

ENode 2.0 - 整体架构介绍

熟悉CQRS架构的人看到这图应该就再熟悉不过了,enode实现的是一个CQRS架构。基本的概念就不多介绍了,如果大家对上图中的一些概念还不太清楚,可以看一下我的博客里的其他相关文章,我应该都有写到。下面主要介绍一下enode 2.0在实现CQRS架构时的一些不一样的地方(由于篇幅的限制,先说三点吧):

Command Handler一次只处理一个Command

就是你不能在command handler中一次修改多个聚合根,我觉得这应该是enode对开发人员的最大约束,可能也是最让开发人员觉得不爽的地方。但我觉得这个不是约束,而是对数据强一致性和最终一致性的一个正确认识。在我学过ddd+cqrs+event sourcing这三个东西之后,我认识到,聚合内必须确保强一致性,聚合间最终一致性。传统三层开发,我们通过unit of work模式(简称uow,比如nhibernate的session, entity framework的dbcontext)可以轻易实现多个对象修改的强一致性事务;确实在传统三层模式开发中,这种利用uow的方式来实现跨聚合的强一致性事务的方式很实用,开发起来很方便,开发人员可以不必担心会出现数据不一致的问题了,因为所有修改总是在一个事务内保存。

但enode的设计目标不是为了支持传统三层开发,而是面向ddd+cqrs+eda+event sourcing架构的框架。曾经我也想让command handler支持修改多个聚合根,但这样做必须要面临一个很棘手的问题:command在发送到command queue时,无法根据聚合根ID来路由了。因为一个command会修改多个聚合根,也就是说一个command不会和一个聚合根一一对应了。这意味着同一个聚合根没办法总是被路由到同一个command queue里,这样就导致相同ID的聚合根可能会在两台服务器被同时修改,这就会导致整个系统可能会频繁的产生并发更新冲突。很多command就会不断的重试,整个系统的性能就会下降。而enode设计之初就是为了高性能,所以这点让我觉得很难接收。

相反,如果一个command总是只会创建或修改一个聚合根,那我们的command就能根据聚合根ID来路由到特定的消息队列,同一个聚合根ID总是会被路由到同一个queue,而一个queue的消费者服务器(command handler所在的服务器)同一时刻总是只有一个,那我们就能保证一个聚合根的修改不会有并发问题。当然光这样还不够,在这个command消费者服务器里,enode框架会用内存级别的queue对同一个聚合根的所有command再次进行排队(如果需要排队的话),之所以要这样是因为有时对一个聚合根的并发修改command可能1s内发送了很多过来,所以command handler肯定来不及在1s内全部处理掉这些command,所以需要在内存里再次排队(天猫双十一的时候,应用服务器内部也会有类似的对同一个聚合根设计一个相应的内存queue来避免对同一个聚合根的修改的并发冲突的问题)。通过这样的设计,我们可以做到绝大部分情况下,不会再有并发冲突的问题,也就是command不会再出现重试的情况。这样最后的效果就是:不同ID的聚合根的处理可以并行,同一个ID的聚合根的处理是串行,通过两级排队实现。前面说到,这样只能做到绝大部分情况下不会有并发冲突,那么什么时候还是会有并发冲突呢?就是在新增command消费者服务器的时候,比如我们发现最近系统繁忙,我们希望增加command消费者服务器来加快command的处理,那在新增服务器后,原来修改某个聚合根的command可能会被路由到新的服务器,但是这个聚合根的有些command可能还在原来的服务器上还没执行完,此时就会出现同一个聚合根在两台服务器上被同时修改的可能了;那这个怎么解决呢?我现在的想法是框架层面不必解决了,我们只需要在系统最空的时候(比如凌晨4点)的时候,增加服务器即可,因为那个时候消息队列里的消息是最少的,也就是不太可能会产生因为增加command handler服务器而导致并发冲突的问题,这样我们就可以最大限度的避免可能带来的并发冲突。

让Domain生活在In Memory中

相比一般的CQRS架构,enode每次在处理一个command,在获取聚合根时,不是从eventstore获取,而是从缓存获取。从上面的架构图可以看出,enode架构中有一个domain memory cache,目前用redis实现。这样做的好处是,将所有的聚合根都缓存在redis缓存中,这样就能提高聚合根的读取时间;有一个问题需要考虑,redis缓存服务器宕机了怎么办?宕机后缓存数据就没了,那如何恢复这些缓存数据呢?这也是我选择redis的一个主要理由,因为redis支持持久化,我们可以利用redis的aof或快照方式的持久化功能,来持久化缓存数据。从而可以在redis挂了后能最快的速度恢复缓存,重启redis服务器即可。那重启之前以及重启的过程中,因为无法从redis获取聚合根了,那只能从eventstore通过event sourcing的方式去获取,那样的话性能肯定会比较差,那怎么办呢?答案是通过定时为聚合根创建快照,这也是采用event sourcing架构的一个好处。我们可以定时对某些聚合跟创建快照(注意,我觉得只需要考虑那些对性能要求很高的模块所涉及到的聚合根创建快照即可),那怎么创建呢?可以开一个独立的进程,监听domain event,对需要创建快照的domain event做出判断,根据某种快照创建策略进行判断,如果认为需要创建快照,则从event store拿出该聚合根的相关事件,通过event sourcing还原得到某个版本的聚合根,这样就得到了某个聚合根的某个版本的快照了。然后持久化起来即可。然后,enode支持在从event store获取聚合根前,先检查是否有快照,如果有快照,则会先加载快照,再把快照之后的domain event从event store获取,再把这些快照之后的domain event一个个apply到当前聚合根,从而得到最新状态的聚合根。这个过程比获取该聚合根的所有领域事件在一个个通过event sourcing还原得到聚合根要快的多;尤其是在一个聚合根的domain event比较多的情况下就更有意义。因此,通过缓存的引入,我们可以提高command handler的处理速度。

Event Store的设计

关于重复的command的幂等处理和聚合根可能存在的并发冲突的判断

另外一点很重要的是,因为我们的command是会发送到分布式消息队列,然后队列中的command消息会被取出来执行;大家知道,我们很难保证一个消息不会被重复执行,也就是说,一个command可能会重复执行。因此,我们的应用要支持对command的密等处理。而对于使用enode框架的应用,因为整个command side的数据持久化就是持久化domain event,程序员不必关心domain event的持久化过程。所以enode很有必要能内置支持对command的重复处理的判断。那么如何做呢?我觉得最靠谱的做法是,在持久化domain event的时候就能绝对靠谱的检测出来某个command是否被重复执行了。那很自然就想到将被持久化的domain event和产生他的对应command关联起来。所以我设计了如下的结构,用来表示一个command在操作聚合根后所产生的领域事件的信息。

ENode 2.0 - 整体架构介绍
/// <summary>The commandId which generate this event stream.
/// </summary>
public string CommitId { get; private set; }
/// <summary>The aggregate root id.
/// </summary>
public string AggregateRootId { get; private set; }
/// <summary>The aggregate root type code.
/// </summary>
public int AggregateRootTypeCode { get; private set; }
/// <summary>The version of the event stream.
/// </summary>
public int Version { get; private set; }
/// <summary>The occurred time of the event stream.
/// </summary>
public DateTime Timestamp { get; private set; }
/// <summary>The domain events of the event stream.
/// </summary>
public IEnumerable<IDomainEvent> Events { get; private set; }
ENode 2.0 - 整体架构介绍
  • CommitId:就是当前的CommandId;
  • AggregateRootId:当前被操作的聚合根的全局唯一ID;
  • AggregateRootTypeCode:表示聚合根的类型的一个code,通过该code我们可以知道当前记录是哪个类型的聚合根的;
  • Version:一个版本号,表示聚合根产生领域事件后的新版本号,是产生事件前的版本号+1;也就是说,聚合根的版本是每次被修改一次,那Version就加1;
  • Timestamp:一个时间戳,用于记录产生domain event时的时间;
  • Events:表示当前command操作聚合根后所产生的领域事件,一次操作可以产生多个领域事件;

对于上面的结构体,我们可以实现两个重要的功能:1)为AggregateRootId和Version这两个字段建立唯一索引,这样我们就能实现判断某个聚合根是否被并发修改,因为如果有并发修改导致并发冲突,那保存到eventstore时,它们的Version肯定是相同的;2)为AggregateRootId和CommitId两个字段建立唯一索引,这样我们就能判断某个command是否被重复执行,因为一个command被实例化出来后,它所要修改的聚合根ID就不可能再修改了,所以如果该command被重复执行,那最后产生的领域事件(上面这个结构体)最后被持久化到eventstore时就会违反这个唯一索引,从而框架就能知道是否有command被重复执行了;

另外,上面这个结构体被保存到eventstore时,是以一条记录的方式被保存,Events集合会被序列化为一段二进制;所以,假如我们用关系型数据库来保存,那就是只有一条insert语句即可,这样就实现了一个聚合根的一次修改的事务持久化。然后因为上两个索引的存在,我们就能在保存时判断是否有并发冲突或command是否被重复执行。

关于Domain Event大数据量的考虑

在设计event store时,我考虑了很多。最后认为event store要解决的最大的两个问题是持久化性能和可水平扩展性。首先,因为每次command handler在处理完一个聚合根后,都会把产生的领域事件持久化到event store,没持久化完成则不能认为该command已处理完,所以持久化的性能对处理command的吞吐量至关重要。另外一点就是可水平扩展性,因为event store里保存的都是domain event,而enode又是为了实现高性能为目标的,所以event store里的数据肯定会非常多,比如1s中要持久化1K个domain event,那一天就会有8600W条记录要记录,一天就真么多,那1年就更多了,所以用单点存储所有的domain event显示不靠谱了。所以我们的event store必须要支持水平扩展。比如我们可以设计100个分区,那每个分区一天只需要保存86W条记录,一年也只需要保存3亿多条记录即可。之前我很追求单个存储节点的高性能,所以曾经想过要用leveldb,stsdb,甚至redis这种高性能的基于key,value的nosql存储。但后来发现这种nosql存储虽然性能很高,但因为只是key,value的存储结构,所以没办法支持二级索引,这样就没办法实现上面第一点中提到的command的幂等处理和聚合根并发冲突的检测。另一个重要的原因是,event store中的数据我们有时候是要被查询的。比如现在某个command遇到的并发冲突,那框架需要自动重试,但是重试之前需要先更新redis缓存,就是把eventstore里的最新的聚合根更新到redis缓存里,这样command在重试时才能拿到最新版本的聚合根,这样重试才能成功。那如何从eventstore里拿最新的聚合根呢?只能根据聚合根ID从eventstore里查询。而聚合根ID又不是key,value nosql的key,自然就没办法实现这个需求了;所以,我觉得合理的办法应该是用关系型数据库来实现eventstore。有人说关系型数据库的性能不行。我觉得只要关系型数据库支持水平扩展,也就是将domain event sharding(分片)到不同的分库分表中,那平均到每个库里的domain event的数量就不大了;这样整个eventstore的持久化性能就可以随着分库的数量的增加而线性增加;比如我现在单个db insert domain event的性能是1K tps(mysql配合ssd硬盘完全无压力,呵呵),那10个库的tps就能达到1W tps了。因为我们分库会根据聚合根ID的hash code来平均散列,这样能确保每个库中的聚合根的domain event数量是基本一样的;从而就能实现整个event store的持久化性能随着分库的增加而线性增加。所以,有了分库的优势,大数据量和性能都不是问题了。且因为关系型数据库支持二级索引和唯一索引,那查询domain event也不是问题了。

ENode物理部署结构图

ENode 2.0 - 整体架构介绍

上图是enode在实际项目中我目前认为的一个物理部署结构图。

首先客户端浏览器通过网络最后访问到我们的web服务器集群,当然web服务器前面肯定还有网关和负载均衡器,我这里为了突出重点就不画出来了。然后每个web服务器接受到httprequest后会生成command,然后通过enode框架发送到分布式消息队列服务器(message queue server),目前由我开发的equeue实现。然后消息队列服务器上的消息会被推送到command process servers,command process server就是执行command handler、完成domain logic,持久化domain event,以及publish domain event的服务器。command process server处理完之后,domain event会由enode框架自动发送到message queue server,然后会被event process server处理,event process server就是订阅domain event,然后根据domain event更新query db。对于查询,web server可以直接通过sql查询query db即可。

各种服务器的集群:

  • web server:无状态,可以任意增加服务器;
  • command process server:就是处理业务逻辑的服务器,也是无状态,可以任意增加服务器;但服务器的数目最好和command所对应的topic下的queue的数量保持一致,这点后续在写分布式消息队列equeue的文章时在详细谈吧;
  • redis server:就是缓存聚合根的服务器,属于缓存服务器;可以按需要存储的容量来规划需要开多少台redis server;目前我觉得最好的redis动态扩容方法就是pre sharding;
  • event db server:就是存储domain event的服务器,按照上的分析,我们采用的是关系型数据库,比如用mysql;mysql的分库分表技术已经很成熟,后续文章我们再详细讨论如何分库以及如何做数据迁移;
  • event process server:就是订阅domain event,根据domain event更新query db的服务器;可以根据需要来部署多少台,和command process server类似;这里有一点必须要先提一下,就是在更新query db时,因为每次更新都是针对某个domain event来更新query db的,而domain event只表示一个聚合根的修改,所以每次我们更新query db时,也只更新该聚合根所在范围的表;我们千万不要去更新超过该聚合根范围的表,否则就会产生并发冲突,导致event handler执行失败;这样就会是的cqrs的query db同步数据变的很慢。对于query side,如果我们觉得直接从query db查询数据太慢,可以考虑设计查询缓存,也就是不走query db来查询数据,而是走缓存。这种缓存就和我们平时的缓存设计类似了;利用domain event,我们先天就有优势可以让缓存非常及时的更新,呵呵。因为一旦有domain event过来,我们就能快速更新我们的query side缓存,而query db就可以异步更新即可。这样就可以解决query side同步更新数据慢的问题。
  • message queue server:就是消息队列服务器,目前equeue还不支持集群,只支持单机;这个以后有时间会考虑实现master-slave模式,类似于淘宝的rocketmq一样。

好了,就写这些吧,后续的再后续文章中补上,呵呵。


上一篇:17.跟金根回顾敏捷个人:技术研究之道


下一篇:【设计模式】 面向对象六大设计原则(一)