sharding-sphere中的xa事务

Apache ShardingSphere 默认的 XA 事务管理器为 Atomikos。

j2ee对分布式事务定义了标准的规范,分别是JTA和JTS,JTA(Java Transaction API)是根据XA规范制定的java版本的接口规范,Atomikos就是jta的一种实现。JTA中约定了几种主要的程序角色:客户端、应用服务器、事务管理器、资源管理器。
JTA和JTS有什么关系呢?事务管理器要和资源管理器要进行事务上下文传播的交互,其中应用服务器和事务管理器之间也有传播事务上下文的交互,JTS就是定义各个程序角色之间如何传递事务上下文的规范。

JTA从框架的角度来约定实现者需要实现的接口,JTS约定了具体程序角色应该怎样去进行交互。

XA是X/Open CAE Specification (Distributed Transaction Processing)模型中定义的TM(Transaction Manager)与RM(Resource Manager)之间进行通信的接口。

在XA规范中,数据库充当RM角色,应用需要充当TM的角色,即生成全局的txId,调用XAResource接口,把多个本地事务协调为全局统一的分布式事务。

sharding-sphere中的xa事务

二阶段提交是XA的标准实现,它将分布式事务的提交拆分为2个阶段:prepare和commit/rollback。

开启XA全局事务后,所有子事务会按照本地默认的隔离级别锁定资源,并记录undo和redo日志,然后由TM发起prepare投票,询问所有的子事务是否可以进行提交。

当所有子事务反馈的结果为“yes”时,TM再发起commit;若其中任何一个子事务反馈的结果为“no”,TM则发起rollback。

如果在prepare阶段的反馈结果为yes,而commit的过程中出现宕机等异常时,则在节点服务重启后,可根据XA recover再次进行commit补偿,以保证数据的一致性。

XA recover的作用是列出所有处于PREPARE阶段的XA事务。

2PC模型中,在prepare阶段需要等待所有参与子事务的反馈,因此可能造成数据库资源锁定时间过长,不适合并发高以及子事务生命周长较长的业务场景。

Sharding-Sphere支持基于XA的强一致性事务解决方案,可以通过SPI注入不同的第三方组件作为事务管理器实现XA协议,如Atomikos和Narayana。

在sharding-sphere中默认使用了atomikos,编程时可以直接使用,例如注解方式为:

    @ShardingTransactionType(TransactionType.XA)
    @Transactional(rollbackFor = Exception.class)
    public void testTransactional() {
        User user1 = new User(1, "faith", 12);
        this.userDao.addOne(user1);
        User user2 = new User(2, "belief", 12);
        this.userDao.addOne(user2);
        this.userDao.addOne(user2); // 这里会报错,因为在分布式事务中,因此user1、user2都会插入失败
    }

如果spring项目中单独使用atomikos,需要做数据源以及事务管理器等配置,例如:

    <!-- 数据源 -->
    <bean id="abstractDatasource" abstract="true"
            class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
        <property name="poolSize" value="${datasource.poolSize}" />
        <property name="minPoolSize" value="${datasource.minPoolSize}" />
        <property name="maxPoolSize" value="${datasource.maxPoolSize}" />
        <!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 -->
        <property name="borrowConnectionTimeout" value="60" />
        <!--最大获取数据时间,如果不设置这个值,Atomikos使用默认的5分钟,那么在处理大批量数据读取的时候,一旦超过5分钟,就会抛出类似 Resultset 
            is close 的错误. -->
        <property name="reapTimeout" value="20" />
        <!--最大闲置时间,超过最小连接池连接的连接将将关闭 -->
        <property name="maxIdleTime" value="${datasource.maxIdleTime}" />
        <!--连接回收时间 -->
        <property name="maintenanceInterval" value="60" />
        <!--java数据库连接池,最大可等待获取datasouce的时间 -->
        <property name="loginTimeout" value="60" />
        <property name="logWriter" value="60" />
        <!-- <property name="maxLifetime" value="1800000"/> -->
        <property name="testQuery">
            <value>select 1</value>
        </property>
    </bean>

    <!-- atomikos事务管理器 -->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
        init-method="init" destroy-method="close">
        <description>UserTransactionManager</description>
        <property name="forceShutdown">
            <value>true</value>
        </property>
    </bean>

    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300" />
    </bean>

    <!-- spring 事务管理器,包装了atomikos事务 -->
    <bean id="jtaTransactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <ref bean="atomikosTransactionManager" />
        </property>
        <property name="userTransaction">
            <ref bean="atomikosUserTransaction" />
        </property>
    </bean>

    <!-- spring 事务模板,和spring的jta事务类似功能 -->
    <bean id="transactionTemplate"
        class="org.springframework.transaction.support.TransactionTemplate">
        <property name="transactionManager">
            <ref bean="jtaTransactionManager" />
        </property>
    </bean>
上一篇:jQuery_CSS 属性下|学习笔记


下一篇:ribbon