关于实现一个基于文件持久化的EventStore的核心构思

大家知道enode框架的架构是基于ddd+event sourcing的思想。我们持久化的不是聚合根的最新状态,而是聚合根产生的领域事件。最近我在思考如何实现一个基于文件的eventstore。目标有两个:

1.必须要高性能;
2.支持聚合根事件的并发持久化,要确保单个聚合根实例不会保存版本号相同的事件;

事件持久化高性能

经过了一番调研,发现用文件存储事件非常合适。要确保高性能,我们可以顺序写文件(append),然后随机读文件。之所以要随机读文件是因为在当某些command由于操作同一个聚合根而遇到并发冲突的时候,框架需要获取该聚合根的所有最新的事件,然后通过event sourcing重建出最新的聚合根,然后再重试这些遇到并发冲突的command。经过测试,顺序写文件和随机读文件都非常高效,每秒100W次顺序写和每秒10W次随机读在我的笔记本上不是问题;因为在enode中,domain是基于in-memory架构的,所以我们很少会从eventstore读取事件。所以重点是要优化持久化事件的性能。而读事件只有在command遇到并发冲突的时候或系统重启的时候,才有可能需要从eventstore读取事件。所以每秒10W次随机读取应该不是问题。当然,关于文件如何写,见下面的遗留问题的分析。

另外一个就是刷磁盘的问题。我们知道,通过文件流写入数据到文件后,如果不Flush文件流,那数据有可能还没刷到磁盘。所以必须定时Flush文件流,出于性能和可靠性的权衡,选择定时1s刷一次磁盘,通过异步线程刷盘。实际上,大部分NoSQL产品都是如此,比如Redis的fsync可以指定为每隔1s刷一次AOF日志到磁盘。这样做唯一的问题是断电后可能丢失1s的数据,但这个可以通过在服务器上配置UPS备用电源确保断电后服务器还能工作,来确保断电后还能支持足够的时间确保我们把文件流的数据刷到磁盘。这样既解决性能问题,也能保证不丢失数据。

事件并发控制

首先,每个聚合根实例有多个事件,每个时刻,每个聚合根可能都会产生多个事件然后要保存到eventstore中。为什么呢?因为我们的domain model所在的应用服务器一般是集群部署的,所以完全有可能同一个聚合根在不同的机器上在被同时在做不同的修改,然后产生的事件的版本号是相同的,从而就会导致并发修改同一个聚合根的情况了。

因此,我们主要要确保的是,对同一个聚合根实例,产生的事件如果版本号相同,则只能有一个事件能保存成功,其他的认为并发冲突,需要告诉外部有并发冲突了,然后由外部决定接下来该如何做。那么如何保证这一点呢?

前面说到,所有聚合根的事件都是顺序的方式append到同一个文件,append事件到文件这个步骤本身没办法检查是否有并发冲突,文件只能帮我们持久化数据,不负责检查是否有并发冲突。那如何检查并发冲突呢?思路就是在内存设计一个Dictionary,Dictionary的key为聚合根ID,value保存当前聚合根产生的事件的最大版本号,也就是最后一个事件的版本号。

然后有两个办法可以实现并发冲突的检测:

  1. 所有的事件进入eventstore服务器后,先通过一个ConcurrentQueue进行排队。所有事件并发进入ConcurrentQueue,然后ConcurrentQueue的消费者为单线程。然后我们在单线程内一个个取出ConcurrentQueue中的事件,然后根据Dictionary里的内容一个个判断当前事件是否有版本冲突,如果没冲突,则先将事件写入文件,再更新Dictionary里当前聚合根的最大版本号;这个方式没问题,只是效率不是非常高,因为这样相当于对所有的聚合根实例的处理都线性化了。实际上,我们希望的是,只有对同一个聚合根实例的操作是线性化的,而对不同聚合根实例之间,完全可以并行处理;那怎么做呢?见第二种思路。
  2. 首先,所有的事件不必排队了,可以并行处理。但是对于每一个聚合根实例的事件的处理,需要通过原子锁的方式(CAS原理)做并发控制。关键思路是,通过一个字段存储每个聚合根的当前版本号信息,版本号信息中设计一个状态位用来控制同一时刻只能有一个线程在更改当前聚合根的版本信息。以此来实现对同一个聚合根的处理的线性化。然后,当前修改版本状态成功的线程,能够进一步做持久化事件的逻辑,但持久化事件之前还需要判断当前事件的版本是否已经是老的版本了(当前事件的版本一定等于当前聚合根的最大版本号+1),以此来确保同一个聚合根的事件序列一定是连续递增的。具体的实现思路见如下的demo代码。

DEMO代码示例、注解

关于实现一个基于文件持久化的EventStore的核心构思
/// <summary>一个结构体,记录当前聚合根的当前版本号,以及用于并发控制的一些状态信息
/// </summary>
class AggregateVersionInfo
{
    public const int Editing = 1;    //一个常量,表示当前聚合根的当前版本号正在被修改
    public const int UnEditing = 0;  //一个常量,表示当前聚合根的当前版本号未在被修改

    public int CurrentVersion = 0;   //记录当前聚合根的当前版本号,初始值为0,其实就是事件的个数
    public int Status = UnEditing;   //默认状态,未被修改
}
class Program
{
    static void Main(string[] args)
    {
        var aggregateCount = 4;                    //用于测试的聚合根的个数
        var eventCountPerAggregate = 10;           //单个聚合根产生的事件数
        var aggregateIdList = new List<string>();  //一个List,存放所有聚合根的ID
        var aggregateCurrentVersionDict = new ConcurrentDictionary<string, AggregateVersionInfo>();  //一个Dict,用于保存所有聚合根的当前版本信息
        var aggregateEventsDict = new Dictionary<string, IList<int>>();                      //一个Dict,用于模拟存储每个聚合根的所有事件

        //先生成所有聚合根ID
        for (var index = 1; index <= aggregateCount; index++)
        {
            aggregateIdList.Add("key-" + index);
        }
        //初始化每个聚合根的当前状态
        foreach (var aggregateId in aggregateIdList)
        {
            aggregateCurrentVersionDict[aggregateId] = new AggregateVersionInfo();
            aggregateEventsDict[aggregateId] = new List<int>();
        }

        //该方法用于实现事件的并发冲突检测和持久化逻辑。
        Action<string, int> persistEventAction = (aggregateId, currentEventVersion) =>
        {
            var aggregateVersionInfo = aggregateCurrentVersionDict[aggregateId];
            var originalStatus = Interlocked.CompareExchange(
                ref aggregateVersionInfo.Status,
                AggregateVersionInfo.Editing,
                AggregateVersionInfo.UnEditing);

            //这里两者不相等,说明aggregateVersionInfo.Status成功更新为Editing了
            if (originalStatus != aggregateVersionInfo.Status)
            {
                if (currentEventVersion == aggregateVersionInfo.CurrentVersion + 1)
                {
                    //这里,将事件加入到一个List,真实的eventstore会在这里持久化事件到文件;
                    aggregateEventsDict[aggregateId].Add(currentEventVersion);
                    //更新聚合根的最新版本
                    aggregateVersionInfo.CurrentVersion++;
                }
                else
                {
                    //进入这里,说明有别的线程已经添加了该版本,也就是遇到并发冲突了。
                }

                //处理完后,将聚合根的版本状态修改回UnEditing
                Interlocked.Exchange(ref aggregateVersionInfo.Status, AggregateVersionInfo.UnEditing);
            }
            else
            {
                //进入这里,说明有别的线程正在更改当前聚合根的版本信息,也可以认为是遇到并发冲突了。
            }
        };

        //该方法用于模拟并行产生事件并调用事件的持久化逻辑
        Action generateEventAction = () =>
        {
            foreach (var aggregateId in aggregateIdList) //循环处理每个聚合根
            {
                //对每个聚合根产生指定个数的事件,为了简化,仅使用事件版本号表示事件了
                for (var eventVersion = 1; eventVersion <= eventCountPerAggregate; eventVersion++)
                {
                    for (var i = 0; i < 100000; i++) //这里纯粹为了性能测试,对每个事件再循环10W次调用持久化逻辑
                    {
                        persistEventAction(aggregateId, eventVersion); //调用持久化方法持久化聚合根的当前事件
                    }
                }
            }    
        };

        var watch = Stopwatch.StartNew();
        //模拟同时4个线程同时产生事件并持久化,这里其实只要开2个够了,因为我的笔记本只有2个核
        Parallel.Invoke(generateEventAction, generateEventAction, generateEventAction, generateEventAction);
        watch.Stop();
        var time = watch.ElapsedMilliseconds;

        //最后输出结果,输出总运行时间,以及验证每个聚合根的当前版本以及聚合根的每个事件的版本是否是顺序逐个递增的。
        Console.WriteLine("total time:{0}ms", time);
        foreach (var aggregateId in aggregateIdList)
        {
            Console.WriteLine("aggregateId:{0}, currentVersion:{1}, events:{2}",
                aggregateId,
                aggregateCurrentVersionDict[aggregateId].CurrentVersion,
                string.Join(",", aggregateEventsDict[aggregateId].ToArray()));
        }

        Console.ReadLine();
    }
}
关于实现一个基于文件持久化的EventStore的核心构思

DEMO运行结果及分析

关于实现一个基于文件持久化的EventStore的核心构思

从上图可以看出,开启4个线程,并行操作4个聚合根,每个聚合根产生10个不同版本的事件(事件版本号连续递增),每个事件重复产生10W次,只花了大概1s时间。另外,最后每个聚合根的当前版本号以及所对应的事件也都是正确的。所以,可以看出,性能还不错。4个线程并行处理,每秒可以处理400W个事件(当然实际肯定没这么高,这里是因为大部分处理都被CompareExchange方法判断掉了。所以,只有没并发的情况,才是理想情况下的最快的性能点,因为每个事件都会做持久化和更新当前版本的逻辑,上面的代码主要是为了验证并发情况下是否会产生重复版本的事件这个功能。),且能保证不会持久化重复版本的事件。明天有空把持久化事件替换为真实的写文件流的方式,看看性能会有多少,理论上只要写文件流够快,那性能应该依旧很高。

遗留问题

上面还有一个问题我还没提及,那就是光用一个文件来存储所有的事件还不够的,我们还需要一个文件存储每个事件在文件中的位置和长度,否则我们没办法知道每个事件存储在文件的哪里。也就是在当事件写入到文件后,我们需要知道当前写入的起始位置,然后我们可以将这个起始位置信息再写入到另一个相当于索引作用的文件。这个问题下次有机会在详细分析吧,总体思路和淘宝开源的高性能分布式消息队列metaq的消息存储架构非常相似。淘宝的metaq之所以能高性能,很大一方面原因也是设计为顺序写文件,随机读文件的思路。如下图所示:

关于实现一个基于文件持久化的EventStore的核心构思

 

上图中的commitlog文件相当于我上面提到的用来存储事件的文本文件,commitlog在metaq消息队列中是用来存储消息的。index文件相当于用来存储事件在commitlog中的位置和长度。在metaq中,则是用来存储消息在commitlog中的位置和长度。所以,从存储结构的角度来看,metaq的消息存储和eventstore的事件存储的结构一致;但不一样的是,metaq在存储消息时,不需要做并发控制,所有消息只要append消息到commitlog即可,所有的index文件也只要append写入即可,关于metaq具体更详细的设计我还没深入研究,有兴趣的朋友也可以和我交流。而eventstore则必须对事件的版本号做并发控制,这是最大的区别。另外,实际上,事件的索引信息可以只需要维护在内存中即可,因为这些索引信息在eventstore启动时总是可以通过commitlog还原出来。当然我们维护一份Index文件也可以,只是会增加事件持久化时的复杂度,这里到底是否需要这个Index文件,我需要再研究下metaq后才能更进一步明确。

关于使用LevelDB的思考

在调研的过程中,无意中发现LevelDB的插入性能非常高。它是由Google的MapReduce和BigTable的作者设计的一个基于key/value结构的轻量级的非常高效的开源的NoSQL数据库。它能够支持10亿级别的数据量存储。LevelDB 是单进程的服务,性能非常之高,在一台4个Q6600的CPU机器上,每秒钟写数据超过40w,而随机读的性能每秒钟超过10w,足见性能之高。正因为他的高效,所以现在很多其他NoSQL都使用它来作为底层的数据持久化,比如淘宝的Tair支持用LevelDB来持久化缓存数据。所以有时间研究下LevelDB的设计与实现非常有必要。但是LevelDB只提供最简单的key/value的操作。对于顺序插入事件的需求,可以调用LevelDB的put操作。但是这里的put操作不支持并发冲突的检测,也就是如果连续put了两个key相同的value,则前一个value就会被后一个value所覆盖,这不是我们想要的。所以我们如果使用LevelDB,对于同一个聚合根不能有两个版本号相同的事件这个需求仍然需要我们自己来保证,可以通过上面DEMO中的思路来实现。也就是说,我们仅仅用LevelDB来代替日志。其实这样已经省去我们很多的工作量,因为我们自己写日志以及记录每个事件的存储位置和长度不是一件容易的事情,要求对算法和逻辑非常严密,否则只要一个bit错位了,可能读取出来的所有数据都错了。而LevelDB帮我们完成了最复杂和头疼的事情了。但不幸的是,LevelDB没有官方的windows版本。我能找到.net平台下的实现,但要在生产环境使用,还是要多做很多验证才行。另外,如果要用LevelDB来持久化事件,那我们的key可以是聚合根ID+事件版本号的字符串拼接。这点应该不难理解吧!

结束语

这篇文章洋洋洒洒,都是思路性的东西,希望大家看了不会枯燥,呵呵。欢迎大家提出自己的意见和建议!


上一篇:说说nodejs里实用的模块


下一篇:Mocha中文指南