微服务-基于CAP,EventBus的分布式事务(7)

一.场景分析 在角色权限配置的开发过程中,角色作为用户中心,权限作为单独的微服务管理。 创建角色时,需要在权限微服务中进行角色权限关联表的维护。 (1)最初是打算通过Grpc来进行微服务之间的数据交互,但实际场景中发现如果Grpc的微服务挂掉,会影响整个系统的运行,违背了CAP的高可用原则。   CAP原则 CAP原则又称CAP定理,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本) 可用性(A):保证每个请求不管成功或者失败都有响应。 分区容忍性(P):系统中任意信息的丢失或失败不会影响系统的继续运作。   (2)即使重启了Grpc的微服务,之前需要传递给Grpc处理的数据已经丢失,于是要引入RabbitMQ消息队列来解决这个问题。 (3)最后到了优化代码的部分,通过依赖注入泛型后的Grpc,还是有很明显的业务耦合,若Grpc方法名更换改变,会影响整个系统运行,于是引入发布订阅模式和事件总线(EventBus)   关于发布和订阅 发布方(Publisher):被观察者,状态改变时,负责通知所有订阅者 订阅方(Subscribe):观察者,订阅事件并对接收的事件进行处理 发布订阅模式有两种实现方式: (1)简单的实现方式:由Publisher维护一个订阅者列表,当状态改变时循环遍历列表通知订阅者。 (2)委托的实现方式:由Publisher定义事件委托,Subscriber实现委托   事件总线 事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的, 事件总线的处理流程 微服务-基于CAP,EventBus的分布式事务(7)

 

 

微服务-基于CAP,EventBus的分布式事务(7) (4)最终通过比较和研究,引入DonetCore.CAP来解决分布式系统的最终一致性和代码的解耦   二.项目实战 CAP文档:https://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/ 项目基于.Net Core3.1,消息队列RabbitMQ,数据库Postgres   (1)创建项目CAPSolution 微服务-基于CAP,EventBus的分布式事务(7) 微服务-基于CAP,EventBus的分布式事务(7) CAPTest作为用户中心微服务 CAPTest2作为角色权限微服务 CAPTest发布创建角色的事件,给CAPTest2订阅,维护角色权限关联表   流程图大致如下: 微服务-基于CAP,EventBus的分布式事务(7)

 

 

微服务-基于CAP,EventBus的分布式事务(7) (2)创建实体表,迁移数据库 CAPTest中创建角色表Role
[Table("Role")]
public class Role
{
    [Key]
    [Column("id")]
    [StringLength(50)]
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string RoleId { get; set; }
    public string RoleName { get; set; }
}
CAPTest2中创建Role_Permission
[Table("Role_Permission")]
public class Role_Permission
{
    [Key]
    [Column("id")]
    [StringLength(50)]
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string RoleId { get; set; }
    public string PermissioId { get; set; }
}
迁移数据库 在CAPTest和CAPTest2中添加以下Nuget包: 微服务-基于CAP,EventBus的分布式事务(7)微服务-基于CAP,EventBus的分布式事务(7) 修改appsettings.json
"ConnectionStrings": { "ConnectionString": "User ID=postgres;Password=c2matica;Host=192.168.1.230;Port=5432;Database=Craftica;Pooling=true;" }
修改Startup.cs
1 #region DB
2 var sqlConnectionString = Configuration.GetConnectionString("ConnectionString");
3 services.AddDbContext<CAPRoleDbContext>(options =>
4 {
5     options.UseLazyLoadingProxies().UseNpgsql(sqlConnectionString);
6 });
7 #endregion
执行迁移命令 微服务-基于CAP,EventBus的分布式事务(7)微服务-基于CAP,EventBus的分布式事务(7) (2)业务代码 引入CAP的Nuget相关包 微服务-基于CAP,EventBus的分布式事务(7)微服务-基于CAP,EventBus的分布式事务(7) 配置CAP的相关属性
 1 services.AddCap(x =>
 2 {
 3     x.UseEntityFramework<CAPRoleDbContext>();
 4     //使用postgresql进行事务处理,防止推送MQ失败,会在指定数据库中自动生成以"cap."开头的表
 5     x.UsePostgreSql(Configuration.GetConnectionString("ConnectionString"));
 6     //x.UseInMemoryMessageQueue();
 7     x.UseRabbitMQ(rb =>
 8     {
 9         rb.HostName = "localhost";
10         rb.UserName = "guest";
11         rb.Password = "guest";
12         rb.Port = 5672;
13         rb.ExchangeName = "cap.text.exchange";
14     });
15     // 添加cap后台监控页面(人工处理);页面地址为“/cap”;如:http://www.site.com/cap
16     //x.UseDashboard();
17     // 配置定时器重试策略
18     //x.FailedRetryInterval = 2; //重试间隔时间(秒),使用默认的就可以,可不用配置
19     x.FailedRetryCount = 5; //重试次数
20 });
在CAPTest中创建RoleController,创建方法AddRole,依赖注入ICapPublisher
 1 private readonly CAPRoleDbContext _cAPRoleDbContext;
 2 protected readonly ICapPublisher _capBus;
 3 public RoleController(CAPRoleDbContext cAPRoleDbContext, ICapPublisher  capBus)
 4 {
 5     _cAPRoleDbContext = cAPRoleDbContext;
 6     _capBus = capBus;
 7 }
 8 [HttpPost("addrole")]
 9 public async Task<IActionResult> AddRole(string roleid)
10 { 
11     _cAPRoleDbContext.Role.Add(new Role()
12     {
13         RoleId = roleid,
14         RoleName = "Name"
15     });
16     var data= await _cAPRoleDbContext.SaveChangesAsync()>0;
17 
18     if (data)
19     {
20         await _capBus.PublishAsync("role.service.addrole", new RolePermissionDto()
21         {
22             RoleId = roleid
23         });
24     }
25     return Ok();
26 }
在CAPTest2中创建PermissionController, 创建方法AddRolePermission,依赖注入ICapPublisher
 1 public class PermissionController : Controller
 2 {
 3     private readonly  CAPPermissionDBContext _cAPPermissionDbContext;
 4     protected readonly ICapPublisher _capBus;
 5     public PermissionController(CAPPermissionDBContext cAPPermissionDbContext, ICapPublisher capBus)
 6     {
 7         _cAPPermissionDbContext = cAPPermissionDbContext;
 8         _capBus = capBus;
 9     }
10 
11     [NonAction]
12     [CapSubscribe("role.service.addrole")]
13     public async Task<IActionResult> AddRolePermission(RolePermissionDto model)
14     {
15         _cAPPermissionDbContext.Role_Permission.Add(new Role_Permission()
16         {
17             PermissioId = "1",
18             RoleId = model.RoleId
19         });
20         await _cAPPermissionDbContext.SaveChangesAsync();
21         return Ok();
22     }
23 }
订阅的方法,要在控制器头部加上标签: [NonAction] [CapSubscribe("role.service.addrole")]   (3)程序运行 首先启动RabbitMQ服务 其次运行微服务,看到CAP started!时表示运行成功 微服务-基于CAP,EventBus的分布式事务(7)  微服务-基于CAP,EventBus的分布式事务(7) 调用CAPTest中AddRole方法,在微服务CAPTest2中会运行对应的方法AddRolePermission。   完成后,可以在表Role,表Role_Permission中看到新增的记录,如下图所示 Role 微服务-基于CAP,EventBus的分布式事务(7)微服务-基于CAP,EventBus的分布式事务(7) Role_Permission 微服务-基于CAP,EventBus的分布式事务(7)

 

微服务-基于CAP,EventBus的分布式事务(7) 在各自数据库中,会对CAP的发布和订阅记录进行存储 数据库CAP_Permission中 cap的received会有记录 微服务-基于CAP,EventBus的分布式事务(7)  微服务-基于CAP,EventBus的分布式事务(7) 数据库CAP_Role中 cap的published会有记录 微服务-基于CAP,EventBus的分布式事务(7)  微服务-基于CAP,EventBus的分布式事务(7)

注:cap.published表和cap.received表是由CAP自动生成的,它内部是使用本地消息表+MQ来实现异步确保

  若只启动发布(CAPTest),看到消息队列下中Exchange为“cap.text.exchange”的一条待消费的记录 微服务-基于CAP,EventBus的分布式事务(7)  微服务-基于CAP,EventBus的分布式事务(7)

 

 

以上仅用于学习和总结!   参考文档: https://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html https://www.cnblogs.com/xhznl/p/13154851.html https://baike.baidu.com/item/CAP%E5%8E%9F%E5%88%99   附: 链接:https://pan.baidu.com/s/1GpmgF0Px5yPTsfkSAwhiYw 提取码:kw89

上一篇:flutter的事件总线event_bus怎么使用


下一篇:Google的Guava包下的EventBus源码解析