.NET Core分布式事件总线、分布式事务解决方案:CAP

简介

CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,它具有轻量级,高性能,易使用等特点。
分布式事务是在分布式系统中不可避免的一个硬性需求,CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。
CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的, CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样可以保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者连接失败的情况时,消息也不会丢失。

源码:
https://github.com/dotnetcore/CAP

目前, CAP 同时支持使用 RabbitMQ,Kafka,Azure Service Bus 等进行底层之间的消息发送,你不需要具备这些消息队列的使用经验,仍然可以轻松的集成到项目中。
CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 数据库的项目。
CAP 同时支持使用 EntityFrameworkCore 和 ADO.NET 的项目,你可以根据需要选择不同的配置方式。
下面是CAP在系统中的一个不完全示意图:
.NET Core分布式事件总线、分布式事务解决方案:CAP

图中实线部分代表用户代码,虚线部分代表CAP内部实现。

CAP集成到项目

添加Nuget包

数据库使用Sqlserver,消息队列使用RabbitMQ

<PackageReference Include="DotNetCore.CAP" Version="3.1.2" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="3.1.2" />
<PackageReference Include="DotNetCore.CAP.SqlServer" Version="3.1.2" />

Startup添加配置

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllersWithViews();

            services.AddCap(option =>
            {
                //如果你使用的 EF 进行数据操作,你需要添加如下配置:
                //option.UseEntityFramework<AppDbContext>();  //可选项,你不需要再次配置 option.UseSqlServer 了

                //如果你使用的ADO.NET,根据数据库选择进行配置:
                option.UseSqlServer("Server=SC-202003151209\\SL1;Database=CAPDB;User=sa;Password=123456;");
                //option.UseMySql("连接字符串")

                //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
                option.UseRabbitMQ(rabbitOption => {
                    rabbitOption.HostName = "xxxx";
                    rabbitOption.Port = 5672;
                    rabbitOption.Password = "xxxx";
                    rabbitOption.UserName = "xxxx";
                    rabbitOption.VirtualHost = "xxxx";
                });
            });
        }

发布事件



        /// <summary>
        /// 不使用事务
        /// </summary>
        /// <param name="capBus"></param>
        /// <returns></returns>
        public IActionResult Index1([FromServices] ICapPublisher capBus)
        {
            capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M });
            return View();
        }
        /// <summary>
        /// 使用事务,自动提交
        /// </summary>
        /// <param name="capBus"></param>
        /// <returns></returns>
        public IActionResult Index2([FromServices] ICapPublisher capBus)
        {
            using (var conn = new SqlConnection("Server=SC-202003151209\\SL1;Database=CAPDB;User=sa;Password=123456;"))
            {
                using (var tran = conn.BeginTransaction(capBus, true))
                {
                    var orderMaster = new OrderMaster { OrderState = 1 };
                    long id = conn.Insert(orderMaster, tran);
                    capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M });
                }
            }
            return View();
        }

订阅事件

如果是在Controller中,直接添加[CapSubscribe("")]来订阅相关消息。


        [CapSubscribe("Meshop.PayService.Refund")]
        public Task Refund(RefundMessage message)
        {
            return Task.CompletedTask;
        }

如果你的方法没有位于Controller 中,那么你订阅的类需要继承 ICapSubscribe,然后添加[CapSubscribe("")]标记:

namespace xxx.Service
{
    public interface ISubscriberService
    {
    	public void CheckReceivedMessage(DateTime time);
    }
    
    
    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
    	[CapSubscribe("xxxx.services.show.time")]
    	public void CheckReceivedMessage(DateTime time)
    	{
    		
    	}
    }
}

然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();
}

参考:
https://www.cnblogs.com/cmliu/p/11767343.html
https://www.cnblogs.com/savorboard/p/cap.html

.NET Core分布式事件总线、分布式事务解决方案:CAP

上一篇:### 前端工程化(2):postCss 和 babel的使用


下一篇:WEB测试用例(二)