Seata实现2PC事务控制

目标

通过学习本案例学习Seata实现2PC事务控制的方法及Seata的工作原理。

案例说明

本示例通过Seata中间件实现分布式事务,模拟两个账户的转账交易过程。
两个账户在三个不同的银行(张三在bank1、李四在bank2),bank1和bank2是两个个微服务。交易过程是,张三给李四转账指定金额。
上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。
Seata实现2PC事务控制

程序组成部分说明

数据库:MySQL-5.7.25

包括bank1和bank2两个数据库。

JDK:64位 jdk1.8.0_201

微服务框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE

seata客户端(RM、TM):spring-cloud-alibaba-seata-2.1.0.RELEASE

seata服务端(TC):seata-server-0.7.1

微服务及数据库的关系 :

dtx/dtx-seata-demo/seata-demo-bank1 银行1,操作张三账户, 连接数据库bank1

dtx/dtx-seata-demo/seata-demo-bank2 银行2,操作李四账户,连接数据库bank2

服务注册中心:dtx/discover-server
本示例程序技术架构如下:
Seata实现2PC事务控制
交互流程如下:

1、请求bank1进行转账,传入转账金额。

2、bank1减少转账金额,调用bank2,传入转账金额。

创建数据库

导入数据库脚本:sql\bank1.sql、sql\bank2.sql
包括如下数据库:

bank1库,包含张三账户

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
  `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
  `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
  `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);

bank2库,包含李四账户

CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

CREATE TABLE `account_info`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
  `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
  `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
  `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);

分别在bank1、bank2库中创建undo_log表,此表为seata框架使用:

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

启动TC(事务协调器)

(1)下载seata服务器

下载地址:https://github.com/seata/seata/releases/download/v0.7.1/seata-server-0.7.1.zip

也可以直接解压:资料\seata-server-0.7.1.zip

(2)解压并启动

[seata服务端解压路径]/bin/seata-server.bat -p 8888 -m file

注:其中8888为服务端口号;file为启动模式,这里指seata服务将采用文件的方式存储信息。

启动服务注册中心

discover-server是服务注册中心,测试工程将自己注册至discover-server,discover-server基于Eureka实现。
discover-server代码地址:https://github.com/pbteach/pbdtx/tree/master/discover-server

创建案例工程

dtx-seata-demo是seata的测试工程,根据业务需求需要创建两个dtx-seata-demo工程。

(1)导入dtx-seata-demo

代码:https://github.com/pbteach/pbdtx/tree/master/dtx-seata-demo

两个测试工程如下:

dtx/dtx-seata-demo/dtx-seata-demo-bank1  ,操作张三账户,连接数据库bank1

dtx/dtx-seata-demo/dtx-seata-demo-bank2  ,操作李四账户,连接数据库bank2

(2)父工程maven依赖说明

在dtx父工程中指定了SpringBoot和SpringCloud版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>2.1.3.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>Greenwich.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

在dtx-seata-demo父工程中指定了spring-cloud-alibaba-dependencies的版本。

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.1.0.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

(3)配置seata

在src/main/resource中,新增registry.conf、file.conf文件,内容可拷贝seata-server-0.7.1中的配置文件子。

在registry.conf中registry.type使用file:

Seata实现2PC事务控制
在file.conf中更改service.vgroup_mapping.[springcloud服务名]-fescar-service-group = “default”,并修改service.default.grouplist =[seata服务端地址]
Seata实现2PC事务控制
关于vgroup_mapping的配置:

vgroup_mapping.事务分组服务名=Seata Server集群名称(默认名称为default)

default.grouplist = Seata Server集群地址

在 org.springframework.cloud:spring-cloud-starter-alibaba-seata的org.springframework.cloud.alibaba.seata.GlobalTransactionAutoConfiguration类中,默认会使用 ${spring.application.name}-fescar-service-group作为事务分组服务名注册到 Seata Server上,如果和file.conf中的配置不一致,会提示 no available server to connect错误

也可以通过配置 spring.cloud.alibaba.seata.tx-service-group修改后缀,但是必须和file.conf中的配置保持一致。

dtx-seata-demo-bank1

dtx-seata-demo-bank1实现如下功能:

1、张三账户减少金额,开启全局事务。

2、远程调用bank2向李四转账。

(1)DAO

@Mapper
@Component
public interface AccountInfoDao {

    //更新账户金额
    @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

}

(2)FeignClient

远程调用bank2的客户端

@FeignClient(value = "seata-demo-bank2",fallback = Bank2ClientFallback.class)
public interface Bank2Client {

    @GetMapping("/bank2/transfer")
    String transfer(@RequestParam("amount") Double amount);
}

@Component
public class Bank2ClientFallback implements Bank2Client{
    @Override
    public String transfer(Double amount) {

        return "fallback";
    }
}

(3)Service

@Service
public class AccountInfoServiceImpl implements AccountInfoService {

    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    Bank2Client bank2Client;

	//张三转账
    @Override
    @GlobalTransactional
    @Transactional
    public void updateAccountBalance(String accountNo, Double amount) {
        logger.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID());
        //张三扣减金额
        accountInfoDao.updateAccountBalance(accountNo,amount*-1);
        //向李四转账
        String remoteRst = bank2Client.transfer(amount);
        //远程调用失败
        if(remoteRst.equals("fallback")){
            throw new RuntimeException("bank1 下游服务异常");
        }
        //人为制造错误
        if(amount==3){
            throw new RuntimeException("bank1 make exception  3");
        }
    }
}

将@GlobalTransactional注解标注在全局事务发起的Service实现方法上,开启全局事务:

GlobalTransactionalInterceptor会拦截@GlobalTransactional注解的方法,生成全局事务ID(XID),XID会在整个分布式事务中传递。

在远程调用时,spring-cloud-alibaba-seata会拦截Feign调用将XID传递到下游服务。

(6)Controller

@RestController
public class Bank1Controller {

    @Autowired
    AccountInfoService accountInfoService;

    //转账
    @GetMapping("/transfer")
    public String transfer(Double amount){
        accountInfoService.updateAccountBalance("1",amount);
        return "bank1"+amount;
    }
}

dtx-seata-demo-bank2

dtx-seata-demo-bank2实现如下功能:

1、李四账户增加金额。

dtx-seata-demo-bank2在本账号事务中作为分支事务不使用@GlobalTransactional。

(1)DAO

@Mapper
@Component
public interface AccountInfoDao {
    //向李四转账
    @Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

}

(2)Service

@Service
public class AccountInfoServiceImpl implements AccountInfoService {

    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);

    @Autowired
    AccountInfoDao accountInfoDao;

    @Override
    @Transactional
    public void updateAccountBalance(String accountNo, Double amount) {
        logger.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID());
        //李四增加金额
        accountInfoDao.updateAccountBalance(accountNo,amount);
        //制造异常
        if(amount==2){
            throw new RuntimeException("bank1 make exception  2");
        }
    }
}

(3)Controller

@RestController
public class Bank2Controller {

    @Autowired
    AccountInfoService accountInfoService;

    @GetMapping("/transfer")
    public String transfer(Double amount){
        accountInfoService.updateAccountBalance("2",amount);
        return "bank2"+amount;
    }
}

测试

  • 张三向李四转账成功。
  • 李四事务失败,张三事务回滚成功。
  • 张三事务失败,李四事务回滚成功。
  • 分支事务超时测试,事务回滚。
上一篇:Seata简介(附带与其他分布式事务解决方案对比)


下一篇:分布式事务之解决方案(XA和2PC)