CAP

1. 概述

  • 相对于直接集成消息队列,异步消息传递最强大的优势之一是可靠性,系统的一个部分中的故障不会传播,也不会导致整个系统崩溃。在 CAP 内部会将消息进行存储,以保证消息的可靠性,并配合重试等策略以达到各个服务之间的数据最终一致性
  • CAP是一个基于.net标准的库,是处理分布式事务的解决方案,还具有EventBus的功能,它轻量级,易于使用且高效
  • 在构建SOA或微服务系统的过程中,通常需要使用事件来集成每个服务。在此过程中,简单使用消息队列并不能保证可靠性。CAP采用与当前数据库集成的本地消息表程序,以解决分布式系统相互调用过程中可能发生的异常。它可以确保在任何情况下都不会丢失事件消息

2. 事件总线

  • 事件总线是一种机制,它允许不同的组件彼此通信。组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。这样,组件可以相互通信而无需相互依赖。同样,很容易替换一个组件。只要新组件了解正在发送和接收的事件,其他组件就永远不会知道

3. 架构

  • CAP有三个主要部分组成:生产者、消费者、传输器、持久化介质
    • 生产者:消息的提供者
    • 消费者:消息的消费者
    • 传输器
      • RabbitMQ
      • Kafka
      • Azure Service Bus
      • In-Memory Queue
      • Redis Stream
      • 传输器最佳实战
        CAP
    • 持久化介质:CAP需要使用具备持久化的存储机制来存储事件消息,来保证消息的可靠性
      • 持久化机制
        • 消息发送前:在消息进入到消息队列之前,CAP使用本地数据库表对消息进行持久化,这样可以保证当消息队列出现异常或者网络错误时候消息是没有丢失的
        • 消息发送后:消息进入到消息队列之后,CAP会启动消息队列的持久化功能
      • SQLServer
      • MySql
      • PostgreSql
      • MongoDB
      • In-Memory

4. 集成 Asp.Net Core Web API

  • 安装NuGet包
    dotnet add package DotNetCore.CAP
    dotnet add package DotNetCore.CAP.RabbitMQ
    dotnet add package DotNetCore.CAP.SqlServer
    dotnet add package DotNetCore.CAP.Dashboard
    
  • 配置CAP
    • 打开Startup.cs,在ConfigureServices方法中注册服务
      public void ConfigureServices(IServiceCollection services)
      {
          services.AddCap(options =>
          {
              //选择SQLServer持久化并配置链接字符串
              options.UseSqlServer(App.Configuratio["ConnectionStrings:ConnectionString"]);
              options.UseRabbitMQ(opt =>
              {
                  opt.HostName = App.Configuratio["RabbitMQSettings:HostName"];
                  opt.UserName = App.Configuratio["RabbitMQSettings:UserName"];
                  opt.Password = App.Configuratio["RabbitMQSettings:Password"];
                  opt.Port = int.Parse(App.Configuratio["RabbitMQSettings:Port"]);
                  opt.ExchangeName = App.Configuratio["RabbitMQSettings:ExchangeName"];
              });
              //消息重试失败
              options.FailedThresholdCallback = failed =>
              {
                  
              };
              options.TopicNamePrefix = App.Configuratio["RabbitMQSettings:TopicNamePrefix"];
              //控制台
              options.UseDashboard();
          });
      }
      
      • 运行项目导航到/cap
  • 发送消息
    public class PublisherController
    {
        #region 成员变量及其属性
    
        private readonly ICapPublisher _capPublisher;
    
        #endregion
    
        #region 构造函数
    
        public PublisherController()
        {
            _capPublisher = capPublisher;
        }
    
        #endregion
    
        #region 公共函数
        
        [HttpPost]
        public void Post(PublishModel publishModel)
        {
            _capPublisher.PublishAsync("PublisherController.Post", publishModel);
        }
    
        #endregion
    
        #region 私有函数
    
        #endregion
    }
    
  • 处理消息
    public class ConsumersController
    {
        #region 成员变量及其属性
    
        #endregion
    
        #region 构造函数
    
        #endregion
    
        #region 公共函数
    
        [NonAction]
        [CapSubscribe("PublisherController.Post")]
        public async Task AddGlobalEventLogForDb(PublishModel publishModel)
        {
            //处理消息
        }
    
        #endregion
    
        #region 私有函数
    
        #endregion
    }
    
上一篇:CMPT 459.1-19. Programming Assignment


下一篇:Hbase Region管理、region分配、region server上线、region server下线、Region分裂、自动分区和手动分区