XA协议是X/Open组织管理的一种分布式协议规范,它采用2阶段提交来管理分布式事务,目前主流的数据库都支持xa协议。
XA模式是目前seata支持的第4种模式。
简介
seata中xa模式的运行机制如下图,这张图片来自官网:
从图中看出,XA模式的2阶段提交跟TCC模式的两阶段提交类似,都是由TM开启全局事务,RM向TC注册分支事务并且报告分支事务状态,TC根据全局事务的状态来提交或回滚分支事务。
而在代码实现上,XA模式使用的是数据源代理来实现的。跟TCC模式不同的是,seata中的TCC模式需要开发人员自己实现分支事务的提交和回滚逻辑,而XA模式是不用的,只要有TCC的prepare方法即可。
代码实现
本文的demo依然采用《springboot研究十二:springcloud+eureka整合seata-tcc模式》中的案例,整个系统有订单、账户和库存3个服务,账户服务是一个TM同时也是一个RM。系统架构图再贴一次:
整个系统的sql语句如下:
#########################seata_order库 use database seata_order; CREATE TABLE `orders` ( `id` mediumint(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `product_id` int(11) DEFAULT NULL, `COUNT` int(11) DEFAULT NULL COMMENT '数量', `pay_amount` decimal(10,2) DEFAULT NULL, `status` varchar(100) DEFAULT NULL, `add_time` datetime DEFAULT CURRENT_TIMESTAMP, `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 #########################seata_pay库 use database seata_pay; DROP TABLE account; CREATE TABLE `account` ( `id` BIGINT(11) NOT NULL AUTO_INCREMENT COMMENT 'id', `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id', `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度', `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额', `balance` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度', `last_update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO `seata_pay`.`account` (`id`, `user_id`, `total`, `used`, `balance`) VALUES ('1', '1', '1000', '0', '100'); #########################seata_storage库 use database seata_storage; CREATE TABLE `storage` ( `id` BIGINT(11) NOT NULL AUTO_INCREMENT, `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id', `total` INT(11) DEFAULT NULL COMMENT '总库存', `used` INT(11) DEFAULT NULL COMMENT '已用库存', `residue` INT(11) DEFAULT NULL COMMENT '剩余库存', PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO `seata_storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');
可以看到,跟AT模式不一样的是,没有了undo_log表,因为XA模式是不用undo_log的。
聚合服务也就是TM在订单服务,代码如下:
@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class) public void create(Order order) { LOGGER.info("------->交易开始"); //本地方法 orderDao.create(order); //远程方法 扣减库存 storageApi.decrease(order.getProductId(),order.getCount()); //远程方法 扣减账户余额 LOGGER.info("------->扣减账户开始order中"); accountApi.decrease(order.getUserId(),order.getPayAmount()); LOGGER.info("------->扣减账户结束order中"); LOGGER.info("------->交易结束"); }
这个全局事务会使用feign来调用账户服务和库存服务进行扣减金额和扣减库存。
在数据源代理的使用上,xa模式跟at模式是完全不同的,我们以订单服务为例来看下,账户和库存服务代码跟这个一样:
@Configuration public class DataSourceConfiguration { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { // DataSourceProxy for AT mode // return new DataSourceProxy(druidDataSource); // DataSourceProxyXA for XA mode return new DataSourceProxyXA(druidDataSource); } @Bean public SqlSessionFactory sqlSessionFactory(DataSource dataSourceProxy)throws Exception{ SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver() .getResources("classpath*:/mapper/*.xml")); sqlSessionFactoryBean.setTypeAliasesPackage("io.seata.sample.entity"); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); return sqlSessionFactoryBean.getObject(); } }
这里我们看到seata为xa提供了连接代理类DataSourceProxyXA,有一点必须注意,DataSourceProxyXA构造函数的入参必须是DruidDataSource,我试了HikariCPDatasource会报错。
测试
测试跟之前一样,请求报文如下:
{ "userId":1, "productId":1, "count":1, "money":1, "payAmount":50 }
请求url如下:
http://localhost:8180/order/create
测试失败的场景,AccountServiceImpl类里面注释掉的代码放开就可以模拟超时事务回滚。
public void decrease(Long userId, BigDecimal payAmount) { LOGGER.info("------->扣减账户开始account中"); //模拟超时异常,全局事务回滚 /*try { Thread.sleep(30*1000); } catch (InterruptedException e) { e.printStackTrace(); }*/ accountDao.decrease(userId,payAmount); LOGGER.info("------->扣减账户结束account中"); }
总结
seata中xa模式的机制跟TCC相类似,都是2阶段提交,而代码实现跟AT模式很像,都是通过数据源代理来实现的。跟TCC模式相比,开发人员可以不用关注分支事务提交和回滚的代码编写,seata框架已经帮我们做了。
源码地址:
https://github.com/jinjunzhu/springcloud-eureka-seata-xa.git