springcloud+eureka整合阿里seata-xa模式

XA协议是X/Open组织管理的一种分布式协议规范,它采用2阶段提交来管理分布式事务,目前主流的数据库都支持xa协议。

XA模式是目前seata支持的第4种模式。

简介

seata中xa模式的运行机制如下图,这张图片来自官网:

springcloud+eureka整合阿里seata-xa模式

从图中看出,XA模式的2阶段提交跟TCC模式的两阶段提交类似,都是由TM开启全局事务,RM向TC注册分支事务并且报告分支事务状态,TC根据全局事务的状态来提交或回滚分支事务。

而在代码实现上,XA模式使用的是数据源代理来实现的。跟TCC模式不同的是,seata中的TCC模式需要开发人员自己实现分支事务的提交和回滚逻辑,而XA模式是不用的,只要有TCC的prepare方法即可。

代码实现 

本文的demo依然采用《springboot研究十二:springcloud+eureka整合seata-tcc模式》中的案例,整个系统有订单、账户和库存3个服务,账户服务是一个TM同时也是一个RM。系统架构图再贴一次:

springcloud+eureka整合阿里seata-xa模式

整个系统的sql语句如下:

#########################seata_order库
use database seata_order;
CREATE TABLE `orders` (
  `id` mediumint(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `product_id` int(11) DEFAULT NULL,
  `COUNT` int(11) DEFAULT NULL COMMENT '数量',
  `pay_amount` decimal(10,2) DEFAULT NULL,
  `status` varchar(100) DEFAULT NULL,
  `add_time` datetime DEFAULT CURRENT_TIMESTAMP,
  `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

#########################seata_pay库
use database seata_pay;
DROP TABLE account;
CREATE TABLE `account` (
  `id` BIGINT(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
  `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
  `balance` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度',
  `last_update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO `seata_pay`.`account` (`id`, `user_id`, `total`, `used`, `balance`) VALUES ('1', '1', '1000', '0', '100');

#########################seata_storage库
use database seata_storage;
CREATE TABLE `storage` (
  `id` BIGINT(11) NOT NULL AUTO_INCREMENT,
  `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
  `total` INT(11) DEFAULT NULL COMMENT '总库存',
  `used` INT(11) DEFAULT NULL COMMENT '已用库存',
  `residue` INT(11) DEFAULT NULL COMMENT '剩余库存',
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO `seata_storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');

可以看到,跟AT模式不一样的是,没有了undo_log表,因为XA模式是不用undo_log的。

聚合服务也就是TM在订单服务,代码如下:

@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
public void create(Order order) {
    LOGGER.info("------->交易开始");
    //本地方法
    orderDao.create(order);

    //远程方法 扣减库存
    storageApi.decrease(order.getProductId(),order.getCount());

    //远程方法 扣减账户余额

    LOGGER.info("------->扣减账户开始order中");
    accountApi.decrease(order.getUserId(),order.getPayAmount());
    LOGGER.info("------->扣减账户结束order中");

    LOGGER.info("------->交易结束");
}

这个全局事务会使用feign来调用账户服务和库存服务进行扣减金额和扣减库存。

在数据源代理的使用上,xa模式跟at模式是完全不同的,我们以订单服务为例来看下,账户和库存服务代码跟这个一样:

@Configuration
public class DataSourceConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Bean("dataSourceProxy")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        // DataSourceProxy for AT mode
        // return new DataSourceProxy(druidDataSource);

        // DataSourceProxyXA for XA mode
        return new DataSourceProxyXA(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dataSourceProxy)throws Exception{
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:/mapper/*.xml"));
        sqlSessionFactoryBean.setTypeAliasesPackage("io.seata.sample.entity");
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }
}

这里我们看到seata为xa提供了连接代理类DataSourceProxyXA,有一点必须注意,DataSourceProxyXA构造函数的入参必须是DruidDataSource,我试了HikariCPDatasource会报错。

测试

 测试跟之前一样,请求报文如下:

{
  "userId":1,
  "productId":1,
  "count":1,
  "money":1,
  "payAmount":50
}

请求url如下:

http://localhost:8180/order/create

测试失败的场景,AccountServiceImpl类里面注释掉的代码放开就可以模拟超时事务回滚。

public void decrease(Long userId, BigDecimal payAmount) {
    LOGGER.info("------->扣减账户开始account中");
    //模拟超时异常,全局事务回滚
    /*try {
        Thread.sleep(30*1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }*/
    accountDao.decrease(userId,payAmount);
    LOGGER.info("------->扣减账户结束account中");
}

总结 

seata中xa模式的机制跟TCC相类似,都是2阶段提交,而代码实现跟AT模式很像,都是通过数据源代理来实现的。跟TCC模式相比,开发人员可以不用关注分支事务提交和回滚的代码编写,seata框架已经帮我们做了。

源码地址:

https://github.com/jinjunzhu/springcloud-eureka-seata-xa.git


上一篇:分布式事务及seata基本使用


下一篇:分布式事务