net core cap结合redis+数据库实现最终一致性

CAP 同时支持使用 RabbitMQ,Kafka,Azure Service Bus 等进行底层之间的消息发送。

CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 数据库的项目。

一般是cap+Kafka,这里使用cap+redis

安装DotNetCore.CAP nuGet包

net core cap结合redis+数据库实现最终一致性

 

 配置 appsettings.json 数据 。

{
  "Logging": {
    "LogLevel": {
      "Default": "Warning"
    }
  },
  "ConnectionStrings": {
    "Mysql_Conn": "Server=localhost;port=3306;Database=db1;UserId=root;Password=123456",
  },
  "RabbitMQ": {
    "HostName": "192.168.122.199",
    "UserName": "admin",
    "Password": "123456",
    "VirtualHost": "vhost_lihy",
    "Port": 5672,
    "ExchangeName": "cap.text.lihy.exchange"

  }, 
  "AllowedHosts": "*"
}

根据底层消息队列,你可以选择引入不同的包:

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP 目前支持使用 SQL Server, PostgreSql, MySql, MongoDB 的项目,你可以选择引入不同的包:

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB     //需要 MongoDB 4.0+ 集群

在 Startup.cs 文件中,添加如下配置:

public void ConfigureServices(IServiceCollection services)
        {
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);


            services.AddDbContext<CapDbContext>(options =>
            options.UseMySql(Configuration.GetConnectionString("Mysql_Conn")));


            services.AddCap(x =>
            {
                //如果你使用的 EF 进行数据操作,你需要添加如下配置:
                x.UseEntityFramework<CapDbContext>();  //可选项,你不需要再次配置 x.UseSqlServer 了

                //如果你使用的ADO.NET,根据数据库选择进行配置:
                //x.UseSqlServer("数据库连接字符串");
                //x.UseMySql("server=localhost;port=3306;userid=root;password=123456;database=db1;SslMode=none");
                //x.UsePostgreSql("数据库连接字符串");

                //如果你使用的 MongoDB,你可以添加如下配置:
                //x.UseMongoDB("ConnectionStrings");  //注意,仅支持MongoDB 4.0+集群

                //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
                x.UseRabbitMQ(o =>
                {
                    o.HostName = Configuration.GetSection("RabbitMQ")["HostName"];
                    o.UserName = Configuration.GetSection("RabbitMQ")["UserName"];
                    o.Password = Configuration.GetSection("RabbitMQ")["Password"];
                    o.VirtualHost = Configuration.GetSection("RabbitMQ")["VirtualHost"];
                    o.Port = Convert.ToInt32(Configuration.GetSection("RabbitMQ")["Port"]);
                    //指定Topic exchange名称,不指定的话会用默认的
                    o.ExchangeName = Configuration.GetSection("RabbitMQ")["ExchangeName"];

                });

                //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。
                x.SucceedMessageExpiredAfter = 24 * 3600;

                //设置失败重试次数
                x.FailedRetryCount = 5;

                //x.UseKafka("ConnectionStrings");
                //x.UseAzureServiceBus("ConnectionStrings");

                x.UseDashboard();
            });

        }

发布事件/消息

新建 PublishController 控制器

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;

// For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860

namespace NETCORE.CAP.Controllers
{
    [Route("api/[controller]")]
    public class PublishController : Controller
    {

        private readonly ICapPublisher _capBus;

        public PublishController(ICapPublisher capPublisher)
        {
            _capBus = capPublisher;
        }


        /// <summary>
        /// 不使用事务
        /// </summary>
        /// <returns></returns>
        [Route("~/without/transaction")]
        public IActionResult WithoutTransaction()
        {
            _capBus.Publish("xxx.services.show.time", DateTime.Now);

            return Ok();
        }


        ////Ado.Net 中使用事务,自动提交
        //[Route("~/adonet/transaction")]
        //public IActionResult AdonetWithTransaction()
        //{
        //    using (var connection = new MySqlConnection(ConnectionString))
        //    {
        //        using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
        //        {
        //            //业务代码

        //            _capBus.Publish("xxx.services.show.time", DateTime.Now);
        //        }
        //    }
        //    return Ok();
        //}

        ////EntityFramework 中使用事务,自动提交
        //[Route("~/ef/transaction")]
        //public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
        //{
        //    using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
        //    {
        //        //业务代码

        //        _capBus.Publish("xxx.services.show.time", DateTime.Now);
        //    }
        //    return Ok();
        //}
         
    }
}

订阅事件/消息

 新建 ReceivedController 控制器

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;

// For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860

namespace NETCORE.CAP.Controllers
{
    [Route("api/[controller]")]
    public class ReceivedController : Controller
    {
        [NonAction]
        [CapSubscribe("xxx.services.show.time")]
        public void CheckReceivedMessage(DateTime time)
        {
            Console.WriteLine(time);
            //return Task.CompletedTask;
        }
    }
}

运行后,

数据库即生成两张表

net core cap结合redis+数据库实现最终一致性

调用接口 https://localhost:5001/without/transaction

在数据表中可查看相关状态。

Cap 仪表盘

默认地址 https://localhost:5001/cap

net core cap结合redis+数据库实现最终一致性

 

 

Dashboard介绍

capOptions.UseDashboard(dashoptions =>
                {
                   dashoptions.AppPath = "applicationpath";
                   dashoptions.PathMatch = "/cap";
                   dashoptions.Authorization = new[] { new CapDashboardFilter() };

                });

这里只说这几个参数

AppPath:应用程序路径  访问dashboard的时候会有一个返回应用的操作,这个即是应用的地址

PathMatch:不设置的情况下都是cap,可以指定自己的dashboard路由地址
Authorization:授权处理

授权处理具体实现

只需要实现接口IDashboardAuthorizationFilter即可

public class CapDashboardFilter : IDashboardAuthorizationFilter
    {

        public bool Authorize(DashboardContext context)
        {
            return true;
        }
    }

通过DashboardContext上下文处理请求,允许返回true,不允许返回false

 附代码:https://gitee.com/wuxincaicai/NETCORE.git

net core cap结合redis+数据库实现最终一致性

上一篇:RTP学习笔记


下一篇:MySQL-常用的几种修改密码方法