FreeSql接入CAP的实践

CAP

CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。

ADO.NET事务

1.DotNetCore.CAP.MySql中引用 了如下类库.在Commit事务时,会调用 Flush方法推送消息?

<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.7" />
<PackageReference Include="MySqlConnector" Version="1.0.1" />
    public class MySqlCapTransaction : CapTransactionBase
    {
        public MySqlCapTransaction(
            IDispatcher dispatcher) : base(dispatcher)
        {
        }

        public override void Commit()
        {
            Debug.Assert(DbTransaction != null);

            switch (DbTransaction)
            {
                case IDbTransaction dbTransaction:
                    dbTransaction.Commit();
                    break;
                case IDbContextTransaction dbContextTransaction:
                    dbContextTransaction.Commit();
                    break;
            }
            Flush();
        }
    }

其中我们能看到,事务的提交,会调用父类CapTransactionBase中的方法Flush。他是protected类型的,并未开放出此接口。

       protected virtual void Flush()
        {
            while (!_bufferList.IsEmpty)
            {
                _bufferList.TryDequeue(out var message);

                _dispatcher.EnqueueToPublish(message);
            }
        }

我们来看一下集成 的demo调用

    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, true))
            {
                //your business code
                connection.Execute("insert into test(name) values(‘test‘)", transaction: (IDbTransaction)transaction.DbTransaction);

                //for (int i = 0; i < 5; i++)
                //{
                _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
                //}
            }
        }

        return Ok();
    }

代码中通过扩展IDbConnection类,增加BeginTransaction方法,传递了注入的_capBus类,传了autoCommit

private readonly ICapPublisher _capBus;

public PublishController(ICapPublisher capPublisher)
{
    _capBus = capPublisher;
}
/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
    ICapPublisher publisher, bool autoCommit = false)
{
    if (dbConnection.State == ConnectionState.Closed)
    {
        dbConnection.Open();
    }

    var dbTransaction = dbConnection.BeginTransaction();
    publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
    return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
}

autoCommit:false,(此属性会自动提交事务,集成其他ORM,不建议开启)因为,我们只要调用 了Publish,他会调用MySqlCapTransaction中的Commit(),并执行Flush,即消息 会发出去。
IDbContextTransaction

这段代码是非常 重要的。

    publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();

从CapPublisher中可以看出,事务是通过AsyncLocal实现状态共享的。

internal class CapPublisher : ICapPublisher
{
     public AsyncLocal<ICapTransaction> Transaction { get; }
}

publisher.Transaction.Value的类型实现上才是ICapTransaction ,

CapTransactionExtensions.cs还有一个扩展方法,调用Begin,相当于给当前控制器上注入的ICapPublisher设置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。

      public static ICapTransaction Begin(this ICapTransaction transaction,
            IDbTransaction dbTransaction, bool autoCommit = false)
        {
            transaction.DbTransaction = dbTransaction;
            transaction.AutoCommit = autoCommit;

            return transaction;
        }

对于ADO.NET,我们只要传递此transaction,就能保证发送消息和操作DB是一个事务了。。

EF Core事务

同样,我们看扩展方法和使用方式

    public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
        ICapPublisher publisher, bool autoCommit = false)
    {
        var trans = database.BeginTransaction();
        publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
        var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
        return new CapEFDbTransaction(capTrans);
    }

dbContext.Database就是DatabaseFacade类型。直接能BeginTransaction事务。

[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
    using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))
    {
        dbContext.Persons.Add(new Person() { Name = "ef.transaction" });

        for (int i = 0; i < 1; i++)
        {
            _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
        }

        dbContext.SaveChanges();

        trans.Commit();
    }
    return Ok();
}

同样,还有一个Begin扩展方法,仅仅是给ICapTransaction赋下值。

public static ICapTransaction Begin(this ICapTransaction transaction,
    IDbContextTransaction dbTransaction, bool autoCommit = false)
{
    transaction.DbTransaction = dbTransaction;
    transaction.AutoCommit = autoCommit;

    return transaction;
}

在这个demo,上,,autoCommit是false,因为dbContext有自己的SaveChanges(),如果发送不太合适。SaveChanges()要做好些操作,具体不太情况是什么,但要在Commit事务前的吧。。具体不详细研究。

但我们可以看下CapTransactionBase源码,DbTransaction是Object类型。

EF Core中的事务类型是IDbContextTransaction?

ADO.NET实际是IDbTransaction类型。

 public object DbTransaction { get; set; }

所以在最开始的那段代码,判断DbTransaction,是哪种类型,然后调用自身内部使用的事务进行Commit()。如果要集成其他ORM,但又想去掉EFCore的依赖,然后增加其他ORM,如下类似的处理,就是关键,比如CommitAsync,Commit,Roolback()

    public override void Commit()
    {
        Debug.Assert(DbTransaction != null);

        switch (DbTransaction)
        {
            case IDbTransaction dbTransaction:
                dbTransaction.Commit();
                break;
            case IDbContextTransaction dbContextTransaction:
                dbContextTransaction.Commit();
                break;
        }
        Flush();
    }

还有MySqlDataStorage.cs

判断dbTransaction的类型,然后获取当前事务,引用其他ORM,记得修改此处。

    var dbTrans = dbTransaction as IDbTransaction;
    if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
    {
        dbTrans = dbContextTrans.GetDbTransaction();
    }

参考项目(不打算维护)

FreeSql接入CAP(最简单的方式)

关于此问题的想法

我们还是引用各自官方的库

Install-Package DotNetCore.CAP.Dashboard
Install-Package DotNetCore.CAP.MySql
Install-Package DotNetCore.CAP.RabbitMQ
Install-Package FreeSql
Install-Package FreeSql.DbContext
Install-Package FreeSql.Provider.MySqlConnector

关于CAP集成的方式,配置项,这里不做详情,官方地址有中文: http://cap.dotnetcore.xyz/

重写扩展方法,BeginTransaction。是基于IUnitOfWork的扩展。

提交事务调用Commit(IUnitOfWork)时,内部再通过反射调用 ICapTransaction中protected类型的方法Flush。

  public static class CapUnitOfWorkExtensions
    {

        public static void Flush(this ICapTransaction capTransaction)
        {
            capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null);
        }

       
        public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false)
        {
            publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction));
            return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit);
        }

        public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork)
        {
            unitOfWork.Commit();
            capTransaction.Flush();
        }
    }

注入我们的FreeSql

public void ConfigureServices(IServiceCollection services)
 {
    IConfigurationSection configurationSection = Configuration.GetSection($"ConnectionStrings:MySql");
    IFreeSql fsql = new FreeSqlBuilder()
           .UseConnectionString(DataType.MySql, configurationSection.Value);
           .UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower)
           .UseAutoSyncStructure(true)
           .UseNoneCommandParameter(true)
           .UseMonitorCommand(cmd =>
           {
               Trace.WriteLine(cmd.CommandText + ";");
           }
           )
           .Build();


    services.AddSingleton(fsql);
    services.AddFreeRepository();
    services.AddScoped<UnitOfWorkManager>();
}

示例

    [HttpGet("~/freesql/unitofwork/{id}")]
    public DateTime UnitOfWorkManagerTransaction(int id, [FromServices] IBaseRepository<Book> repo)
    {
        DateTime now = DateTime.Now;
        using (IUnitOfWork uow = _unitOfWorkManager.Begin())
        {
            ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);
            repo.Insert(new Book()
            {
                Author = "luoyunchong",
                Summary = "2",
                Title = "122"
            });

            _capBus.Publish("freesql.time", now);
            trans.Commit(uow);
        }
        return now;
    }
    
    [NonAction]
    [CapSubscribe("freesql.time")]
    public void GetTime(DateTime time)
    {
        Console.WriteLine($"time:{time}");
    }

注意trans不需要using,freesql内部会释放资源。,也可using,但请更新到最新的freesql版本。

ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);

提交事务,也请调用扩展方法,否则事务无法正常。

trans.Commit(uow);

源码位置

FreeSql接入CAP的实践

上一篇:MongoDB--安全认证


下一篇:Oracle静默安装模板