CAP 2.3版本发布,支持 MongoDB

前言

经过2个月的调整及测试,CAP 2.3 版本终于发布了,这个版本最大的特性就是对于 MongoDB 的支持,感谢博客园团队的keke同学对于 MongoDB 支持所提供的 PR,相信随着博客园的使用,CAP 会越来越多的帮助到更多的人。

CAP 是一个用来解决微服务或者分布式系统中分布式事务问题的一个开源项目解决方案(https://github.com/dotnetcore/CAP),目前已经将近2岁了,想对 CAP 更多了解的同学可以看下我的这篇文章

背景故事

在 2.3 版本中,我们对 Api 做了一些调整,为什么做这些调整呢?我就来说一下这中间的过程

相信在用 CAP 的同学们都知道在2.2以及以前的版本中存在一个 Bug 就是在使用事务的情况下消息持久化到数据库后如果还没有提交事务,那么这个时候 CAP 就会开始向消息队列中发送消息,但是有一个问题就是如果接下来事务提交失败,这个时候其实消息已经被发送出去了,就会导致消费端接收到了消息,对应到 GitHub 的这个 issue

这个 Bug 要说严重也严重,要说不严重也不严重,但是我们总要解决这个问题。怎么解决呢?有些同学可能会说把发送消息改到事务提交完成后不就行了,但是 CAP 是无法获取到业务端的事务执行结果的,因为在.NET中没有类似于Spring Transaction这种机制可以很容易的做一些扩展,所以如果想改到事务提交后,那么就必须提供一个 API 让用户手动来调用进行发送。这样看来可以很容易的解决这个问题,但是我觉得这样对于使用者来说就要多一行代码,需要增加学习成本以及要多理解框架内部做的一些事情,还有可能会忘记调用或者用错。

为了让 CAP 的使用者少写这一行代码,我思考了好几个月,说一下过程吧。

对于数据库底层驱动的代码做过了解的同学可能知道,数据库驱动在底层封装的特别死,特别是对于事务这块的处理,类都是 sealed 几乎没有办法进行扩展,我做了一些尝试都失败了,最后都想 fork 一个数据库驱动来修改发布自己的 Nuget 包了,但是这个方案最终被否定了,因为我自己都不愿意用自己编译的数据库驱动,最终这条路行不通。

另外一个方案就是对于 Diagnostics 有了解的同学可能想到了,可以利用这个特性来追踪事务提交的结果,然后在其中做一些处理就行了。但是有个什么问题呢?目前只有 SQL Server 的驱动才支持 Diagnostics,其他的 MySql,PostgreSql 均不支持,怎么办呢?不可能不去管使用 MySql,PostgreSql 的那些用户,毕竟我们自己也是使用的 MySql。

我和 Lemon 同学曾分别向 MySql 和 PostgreSql C#数据库项目提交了对 Diagnostics 特性支持的 PR(MySql PR, PostgreSql PR),但是由于微软对于 DiagnosticsSource API 设计的问题,导致社区对于这种 API 的方式比较反感,另外就是指导文档中的一些原则,微软在 SQLClient 的驱动中都没有遵守,所以这两个 PR 一直没有进行合并,有兴趣的也可以看下这里的讨论,这样等下去也不知道要等到什么时候。

还有一个原因是因为我们需要对接新的MongoDB,MongoDB对于事务的处理在API上有所不同,为了提供一致的用户接口,所以需要作出一些改变。

以上,我们需要对 API 做出调整,我们不能一直停滞不前。下面我们来看一下2.3版本做出的改变吧。

CAP 2.3 版本中的改变

1、移除了CAP 中间件注册

现在,你不需要再使用 app.UseCap() 手动添加中间件,我们将自动注册。

在 2.3 以前的版本中,需要在 Startup 中 Configure 中注册 CAP,现在我们已经自动的在启动的时候进行了注册你不再需要手动注册。

public void Configure(IApplicationBuilder app)
{
app.UseCap(); //移除了,不需要再手动注册
}

2、修改了消息表主键类型

为了适配最新的MongoDB以及某些场景下的数据表迁移,我们将消息表的主键Id由自增长的int类型改为了由雪花算法生成的long类型,这在一定程度上可以提高消息处理的性能以及逻辑的复杂性。

由 2.2 版本升级上来的同学,我们提供了数据库迁移脚本,你可以查看这里来获取数据库迁移脚本,然后在数据库执行即可。

https://gist.github.com/yuleyule66/0e5ec7a5046dc58fcf89d51e4820c5cd

3、修改了 Publish Api

我们添加了一个新的 ICapTransaction 接口,用来控制事务的处理,同时这也是为了处理跟踪不到事务处理接口的情况,同时我们封装了一系列扩展方法,方便开发者使用,下面我们看一下新的Api

  • MongoDB:
using (var session = _mongoClient.StartTransaction(_capBus, autoCommit: false))
{
var collection = _mongoClient.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); _capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now); session.CommitTransaction();
}

这里的 connection.StartTransaction 是一个扩展方法,这个扩展方法返回 IClientSessionHandle 接口,它代表的是MongoDB的原生事务对象,我们在做自己业务代码的时候拿到这个对象即可使用。

命名 StartTransaction 的原因是我们希望遵循MongoDB的命名规范,便于使用者理解

  • SQLServer:
using (var connection = new SqlConnection(""))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false))
{
//your business code sample
connection.Execute("insert into test(name) values('test')", transaction); _capBus.Publish("sample.rabbitmq.sqlserver", DateTime.Now); transaction.Commit();
}
}

这里的 connection.BeginTransaction 是一个扩展方法,这个扩展方法返回 IDbTransaction 接口,它代表的是数据库的原生事务对象,我们在做自己业务代码的时候使用这个对象传到Dapper或者Ado.net中即可。

  • MySql 和 PostgreSql:
using (var connection = new MySqlConnection(""))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false))
{
//your business code sample
connection.Execute("insert into test(name) values('test')",transaction: (IDbTransaction)transaction.DbTransaction); _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); transaction.Commit();
}
}

这里的 connection.BeginTransaction 是一个扩展方法,这个扩展方法返回 ICapTransaction 接口,接口包装的有 DbTransaction 属性,它代表的是数据库的原生事务对象,我们在做自己业务代码的时候使用这个对象传到Dapper或者Ado.net中即可。

4、增加了事务自动提交

有些情况下,为了精简代码,我们不想去手动调用 transaction.Commit() 方法希望CAP去帮助你提交事务,那么也是可以做到的,你只需要在 connection.BeginTransaction 的时候传递参数 autoCommittrue 即可。

需要注意的是,当使用事务自动提交功能时,你需要在你的业务逻辑执行完成之后再发送消息,也就是说 _capBus.Publish 这行代码需要放到最后执行。实例代码:

using (var connection = new MySqlConnection(""))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
{
//your business code sample _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
}

注意这里的 autoCommit: true,并且取消了transaction.Commit()

5、增加对 MongoDB 的支持

在微服务应用中,有时候我们的某些服务可能为了性能或者是其他原因考虑,使用的不是传统的关系型数据库,而且一些非关系型数据库,比如这其中MongoDB作为代表,使用的人也最多,然后就有需求希望在存储数据的时候也想保证数据的高一致性。

MongoDB 在 4.0 及以上版本中支持了ACID事务,这个特性使我们有理由对MongoDB提供支持,同时MongoDB的支持也是博客园的Keke同学提供的PR,再次感谢。

有些同学可能想尝试一下,那么下面就来简单的说一下,MongoDB 对 ACID事务的支持是需要集群才能使用,所以我们需要首先搭建一个集群,搭建集群的文章我已经写好了,大家可以参考这篇博客来搭建。

集群搭建完成之后,在 Startup.cs 文件中的 ConfigureServices(IServiceCollection services) 中配置下即可。

public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMongoClient>(new MongoClient("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0")); services.AddCap(x =>
{
x.UseMongoDB("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0");
x.UseRabbitMQ("localhost");
x.UseDashboard();
});
}

使用方法:

注意:MongoDB 不能在事务中创建数据库和集合,所以如果你集群创建好之后是空的,则你需要单独先创建数据库和集合,可以模拟一条记录插入就会自动创建了。

var mycollection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
mycollection.InsertOne(new BsonDocument { { "test", "test" } });

然后

[Route("~/without/transaction")]
public IActionResult WithoutTransaction()
{
_capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now); return Ok();
} [Route("~/transaction/not/autocommit")]
public IActionResult PublishNotAutoCommit()
{
//NOTE: before your test, your need to create database and collection at first
using (var session = _client.StartTransaction(_capBus, autoCommit: true))
{
var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection"); collection.InsertOne(session, new BsonDocument { { "hello", "world" } }); _capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now);
}
return Ok();
}

总结

最近一两个月明显感觉到使用 CAP 的人越来越多了,博客园也出现了一些CAP的博客文章,我们很开心能够帮助到大家

。大家在使用的过程中遇到问题希望也能够积极的反馈,帮助CAP变得越来越好。

上一篇:使用javascript中读取Xml文件做成的一个二级联动菜单


下一篇:rust cargo 一些方便的三方cargo 子命令扩展