分布式事务JTA实现Atomikos与Spring集成实践

  Atomikos官网无法访问,不过Maven*库中具atomikos包。Atomikos集成Spring,Hibernate,Mybatis网上文章比较多,本文是通过JavaSE的方式借用Spring配置来测试Atomikos对JTA的实现。


 下面做一件事,就是两(+)个数据库,在一个事务里对其分别对数据库操作验证操作的原子性,即要么两个数据库的操作都成功,要么都失败。


 1.准备工作

  1.1 Maven pom.xml中添加依赖包

  atomikos:(目前最新版)

  

1
2
3
4
5
<dependency>
            <groupId>com.atomikos</groupId>
            <artifactId>transactions-jdbc</artifactId>
            <version>3.9.3</version>
        </dependency>


 jar依赖图:

 分布式事务JTA实现Atomikos与Spring集成实践

 

 Postgresql数据库驱动:

1
2
3
4
5
<dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.2-1004-jdbc4</version>
        </dependency>

 注:文中使用的是Postgresql V9.2

 

 javax.transaction.transaction-api.1.1:

 

1
2
3
4
5
6
7
8
<!-- Include in javaee-api -->
        <!-- 
        <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>transaction-api</artifactId>
            <version>1.1</version>
        </dependency>
         -->

 

 

1
2
3
4
5
6
7
<!-- JavaEE -->
        <!-- javaee-api包含了JavaEE规范中的api,如servlet-api,persistence-api, transaction-api等 -->
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
        </dependency>

 注:根据需要选择其一即可。


 Spring,Junit依赖这里省略。


 1.2 创建数据库以及表

   数据库分别是:javaee,tomdb

   分布式事务JTA实现Atomikos与Spring集成实践


 2.在项目中添加配置文件

  spring-jta.xml 和transaction.properties(模版见文中附件)文件,spring-jta.xml在src/main/resources/integration下,transaction.properties在src/main/resources/下。


 2.1在spring-jta.xml中配置两个XADataSource:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<!-- 使用分布式事务时设置Postgresql的max_prepared_transactions为大于0的值,该值默认是0 -->
    <!-- 数据库A -->
    <bean id="a" class="com.atomikos.jdbc.AtomikosDataSourceBean"
        init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="pg/a" />
        <property name="xaDataSourceClassName" value="org.postgresql.xa.PGXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="user">postgres</prop>
                <prop key="password">postgres</prop>
                <prop key="serverName">localhost</prop>
                <prop key="portNumber">5432</prop>
                <prop key="databaseName">tomdb</prop>
            </props>
        </property>
        <property name="poolSize" value="10" />
        <property name="reapTimeout" value="20000" />
    </bean>
    <bean id="b" class="com.atomikos.jdbc.AtomikosDataSourceBean"
        init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="pg/b" />
        <property name="xaDataSourceClassName" value="org.postgresql.xa.PGXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="user">postgres</prop>
                <prop key="password">postgres</prop>
                <prop key="serverName">localhost</prop>
                <prop key="portNumber">5432</prop>
                <prop key="databaseName">javaee</prop>
            </props>
        </property>
        <property name="poolSize" value="10" />
        <property name="reapTimeout" value="20000" />
    </bean>


 说明:

 Postgresql的max_prepared_transactions参数值默认是0,要开启XA需要设置该值至少和max_connections参数值一样大,该参数在PostgreSQL\9.3\data\postgresql.conf文件中。

 PGXADataSource的父类BaseDataSource没有url属性,可需要分别设置serverName,portNumber,databaseName等属性。不同的数据库驱动有不同的实现方法。


 2.2 配置事务管理对象和UserTransaction接口实现

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- atomikos事务管理 -->
    <bean id="atomikosUserTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"  init-method="init" destroy-method="close">
        <description>UserTransactionManager</description>
        <property name="forceShutdown" value="true" />
    </bean>
 
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300" />
    </bean>
 
    <bean id="transactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosUserTransactionManager"></property>
    </bean>

 

 上面三个Bean可以独立使用来进行事务控制,具体看下面3。


3. 编写测试

 3.1 使用atomikosUserTransactionManager对象测试(TestAtomikos1.java)

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package secondriver.springsubway.example.jta;
 
import java.sql.Connection;
import java.sql.SQLException;
 
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
 
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
 
public class TestAtomikos1 {
 
    public static ApplicationContext ctx;
 
    @BeforeClass
    public static void beforeClass() {
        ctx = new ClassPathXmlApplicationContext(
                "classpath:integration/spring-jta.xml");
    }
 
    public static void afterClass() {
        ctx = null;
    }
 
    @Test
    public void test1() {
        exe("abc""abc");
    }
 
    @Test
    public void test2() {
        exe("123=""123");
    }
 
    public void exe(String av, String bv) {
 
        AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean("a");
        AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean("b");
        Connection connA;
        Connection connB;
        UserTransactionManager utm = (UserTransactionManager) ctx
                .getBean("atomikosUserTransactionManager");
        try {
            utm.begin();
            connA = adsA.getConnection();
            connB = adsB.getConnection();
            connA.prepareStatement(
                    "insert into jta_temp (value) values('" + av + "')")
                    .execute();
            connB.prepareStatement(
                    "insert into jta_temp (value) values('" + bv + "')")
                    .execute();
            utm.commit();
        catch (SQLException | NotSupportedException | SystemException
                | SecurityException | IllegalStateException | RollbackException
                | HeuristicMixedException | HeuristicRollbackException e) {
            e.printStackTrace();
            try {
                utm.rollback();
            catch (IllegalStateException | SecurityException
                    | SystemException e1) {
                e1.printStackTrace();
            }
        }
    }
}


3.2使用Spring的JtaUserTransactionManager对象测试(TestAtomikos2.java 修改TestAtomikos1.java中的exe方法即可)

 

1
2
3
4
5
6
7
8
9
@Test
    public void test1() {
        exe("abc""abc");
    }
 
    @Test
    public void test2() {
        exe("123=""123");
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void exe(String av, String bv) {
        TransactionFactory txm = (TransactionFactory) ctx
                .getBean("transactionManager");
 
        JdbcTemplate a = (JdbcTemplate) ctx.getBean("jdbcTemplateA");
        JdbcTemplate b = (JdbcTemplate) ctx.getBean("jdbcTemplateB");
        Transaction tx =null;
        try {
         tx = txm.createTransaction("tx-name-define"10000);
            a.update("insert into jta_temp (value) values('" + av + "')");
            b.update("insert into jta_temp (value) values('" + bv + "')");
            tx.commit();
        catch (NotSupportedException | SystemException | SecurityException
                | RollbackException | HeuristicMixedException
                | HeuristicRollbackException e) {
            e.printStackTrace();
            if(tx!=null){
                try {
                    tx.rollback();
                catch (IllegalStateException | SystemException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

 

3.3使用atomikosUserTransaction Bean对象进行测试(TestAtomikos3.java 修改TestAtomikos1.java中的exe方法即可)

1
2
3
4
5
6
7
8
9
@Test
    public void test1() {
        exe("abc""abc");
    }
 
    @Test
    public void test2() {
        exe("123""123=");
    }


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void exe(String av, String bv) {
 
        AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean("a");
        AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean("b");
        Connection connA;
        Connection connB;
        UserTransaction utx = (UserTransaction) ctx
                .getBean("atomikosUserTransaction");
        try {
            utx.begin();
            connA = adsA.getConnection();
            connB = adsB.getConnection();
            connA.prepareStatement(
                    "insert into jta_temp (value) values('" + av + "')")
                    .execute();
            connB.prepareStatement(
                    "insert into jta_temp (value) values('" + bv + "')")
                    .execute();
            utx.commit();
        catch (SQLException | NotSupportedException | SystemException
                | SecurityException | IllegalStateException | RollbackException
                | HeuristicMixedException | HeuristicRollbackException e) {
            e.printStackTrace();
            try {
                utx.rollback();
            catch (IllegalStateException | SecurityException
                    | SystemException e1) {
                e1.printStackTrace();
            }
        }
    }


 使用上述三种UserTransaction进行测试,其中test1方法是成功执行,test2方法是执行失败的(因为插入的值长度超过的字段长度限制)。通过分析之后,如果分布式事物控制正确,那么数据库中写入的值对于两张不同的表而言,是没有数字值被写入的。如图所示:

 分布式事务JTA实现Atomikos与Spring集成实践


 在测试过程中,经过对比确实达到了分布式事务控制的效果。

 整个操作的架构图如下(摘自Atomikos Blog)

 分布式事务JTA实现Atomikos与Spring集成实践


 关于JTA原理文章开始提到的那篇文章,写的很详细和清晰,可以细细阅读和理解。



本文转自 secondriver 51CTO博客,原文链接:http://blog.51cto.com/aiilive/1658102,如需转载请自行联系原作者

上一篇:从vs2010的UnitTestFramework类库提取私有方法反射调用的方法


下一篇:顺序队列C/C++实现