Spring Boot2中整合atomikos来实现不同类型数据库的分布式事务一致性

由于需要重构一个老的系统(Oracle),业务侧要求老系统和新系统(Mysql)并行运行半年,证明重构系统的稳定性才能上线,在这半年期间,新系统用来查询,全文检索,图数据库查询,老系依然办理业务,因此就存在在一个事务提交中,同时写Mysql和Oracle,比较了一下方案,最终选择了atomikos来做分布式事务。先说缺点:
1、性能比原来的单纯的德鲁伊连接池慢。
2、卡,A库没提交会导致B库也卡主,体验非常不好。
3、德鲁伊的连接池驱动jar的版本,需要和对应的数据库特定版本的驱动保持一致,否则有些方法在德鲁伊连接池都还没实现。
4、扩展性不好,如果再需要一个BI的库做分析,那么又得要CDC方案从A库同步业务数据到B库,这个数据同步的实时性达不到业务要求。

先上代码,把工程能跑起来先:
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>learn-jta-atomikos-SpringBoot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>learn-jta-atomikos</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- MyBatis -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-boot-starter-logging</artifactId>
                    <groupId>org.springframework.boot</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 热部署 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!-- 数据库连接 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--<scope>runtime</scope>-->
            <version>8.0.11</version>    <!--分布式事物的驱动,对版本有要求的,不同的数据库,还不一样-->
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.20</version>
        </dependency>
        <!-- 分布式事务atomikos -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <!-- tx + aop -->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.9.4</version>
        </dependency>
        <!-- 添加Log4j2 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <!-- 为log4j2添加异步支持 -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>
        <!-- 简化代码 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 用于监控与管理 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-boot-starter-logging</artifactId>
                    <groupId>org.springframework.boot</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- WEB -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 配合@ConfigurationProperties编译生成元数据文件(IDEA编辑器的属性提示) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.yml

spring:
  application:
    name: learn-jta-atomikos
  aop:
    proxy-target-class: true

  ## jta相关参数配置
  # 如果你在JTA环境中,并且仍然希望使用本地事务,你可以设置spring.jta.enabled属性为false以禁用JTA自动配置。
  jta:
    enabled:true
    # 必须配置唯一的资源名
  mysql:
    #db1(分布式的第一个库)
    test1:
      url: jdbc:mysql://localhost:3306/db1?useUnicode=true&characterEncoding=utf-8
      username: root
      password: xxxx
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
    #db2(分布式的第二个库)
    test2:
      url: jdbc:mysql://localhost:3306/db2?useUnicode=true&characterEncoding=utf-8
      username: root
      password: xxxxx
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
  ## Druid监控设置
  datasource:
    druid:
      #web-stat-filter.exclusions: *.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*
      stat-view-servlet.url-pattern: /druid/*
      stat-view-servlet.reset-enable: true
      stat-view-servlet.login-username: admin
      stat-view-servlet.login-password: admin
      aop-patterns: com.example.atomikos.service.*
# 开启下划线
mybatis:
  configuration:
    map-underscore-to-camel-case: true

Application.java

package com.atomikos;

import com.atomikos.config.pojo.DBConfig1;
import com.atomikos.config.pojo.DBConfig2;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;


/**
 * 将来这个类也是可以被Junit集成起来进行测试的
 */
@SpringBootApplication
@MapperScan("com.atomikos")            //其他项目中,这个是可以不用的,可是在则个分布式的新项目中,这个扫描Mmapper类是必须的,标记了mapper还是找不到,只好把这里打开
@EnableConfigurationProperties(value={DBConfig1.class,DBConfig2.class})      //值对象必须加,否则后续扫描不到这个类
public class Application {

    public static void main(String[] args) {

        SpringApplication.run(Application.class,args);
    }
}

MyBatisConfig1.java

package com.atomikos.config;

import java.sql.SQLException;
import javax.sql.DataSource;

import com.atomikos.config.pojo.DBConfig1;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.context.annotation.Primary;


@Configuration
// basePackages 最好分开配置 如果放在同一个文件夹可能会报错
@MapperScan(basePackages = "com.atomikos.dao.db1", sqlSessionTemplateRef = "testSqlSessionTemplate")
public class MyBatisConfig1 {

    @Primary                    //这个primary必须加,否则spring在两个sessionfactory的时候,不知道用哪个?
    // 配置数据源
    @Bean(name = "testDataSource")
    public DataSource testDataSource(DBConfig1 testConfig) throws SQLException {
        // 这里直接针对mysql的分布式驱动,进行硬编码了
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        mysqlXaDataSource.setUrl(testConfig.getUrl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(testConfig.getPassword());
        mysqlXaDataSource.setUser(testConfig.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        // 将本地事务注册到创 Atomikos全局事务
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("testDataSource");    //硬编码,这里也是可以考虑落到配置文件中的

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Primary
    @Bean(name = "testSqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("testDataSource") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Primary
    @Bean(name = "testSqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("testSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

MyBatisConfig2.java

package com.atomikos.config;

import java.sql.SQLException;
import javax.sql.DataSource;

import com.atomikos.config.pojo.DBConfig2;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.atomikos.jdbc.AtomikosDataSourceBean;


@Configuration
@MapperScan(basePackages = "com.atomikos.dao.db2", sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class MyBatisConfig2 {

    // 配置数据源
    @Bean(name = "test2DataSource")
    public DataSource testDataSource(DBConfig2 testConfig) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        mysqlXaDataSource.setUrl(testConfig.getUrl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(testConfig.getPassword());
        mysqlXaDataSource.setUser(testConfig.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("test2DataSource");

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "test2SqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "test2SqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

TransactionConfig.java

package com.atomikos.config;

import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.*;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * 配置声明式事务 切面拦截(本次演示中,没啥用)
 *
 * @author yehaibo
 */
@Configuration
public class TransactionConfig {

    private static final int TX_METHOD_TIMEOUT = 5;
    private static final String AOP_POINTCUT_EXPRESSION = "execution (* com.atomikos.service.*.*(..))";

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public TransactionInterceptor txAdvice() {
        NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();

        /* 只读事务,不做更新操作 */
        RuleBasedTransactionAttribute readOnlyTx = new RuleBasedTransactionAttribute();
        readOnlyTx.setReadOnly(true);
        readOnlyTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);

        /* 当前存在事务就使用当前事务,当前不存在事务就创建一个新的事务 */
        RuleBasedTransactionAttribute requiredTx = new RuleBasedTransactionAttribute();
        requiredTx.setRollbackRules(Collections.singletonList(new RollbackRuleAttribute(Exception.class)));
        requiredTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        requiredTx.setTimeout(TX_METHOD_TIMEOUT);
        Map<String, TransactionAttribute> txMap = new HashMap<>(10);

        txMap.put("add*", requiredTx);
        txMap.put("save*", requiredTx);
        txMap.put("insert*", requiredTx);
        txMap.put("update*", requiredTx);
        txMap.put("delete*", requiredTx);

        txMap.put("get*", readOnlyTx);
        txMap.put("query*", readOnlyTx);
        txMap.put("list*", readOnlyTx);
        txMap.put("find*", readOnlyTx);
        source.setNameMap(txMap);
        return new TransactionInterceptor(transactionManager, source);
    }

    /**
     * 切点
     *
     * @return
     */
    @Bean
    public Advisor txAdviceAdvisor() {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression(AOP_POINTCUT_EXPRESSION);
        return new DefaultPointcutAdvisor(pointcut, txAdvice());
    }

}

DBConfig1.java

package com.atomikos.config.pojo;


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
/**
 * 将application.properties配置文件中配置自动封装到实体类字段中
 * @author yehaibo
 */
@ConfigurationProperties(prefix = "spring.mysql.test1") // 注意这个前缀要和application.yml文件的前缀一样
public class DBConfig1 {

    private String url;
    // 比如这个url在properties中是这样子的mysql.datasource.test1.username = root
    private String username;

    public String getUrl() {        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getMaxLifetime() {
        return maxLifetime;
    }

    public void setMaxLifetime(int maxLifetime) {
        this.maxLifetime = maxLifetime;
    }

    public int getBorrowConnectionTimeout() {
        return borrowConnectionTimeout;
    }

    public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
        this.borrowConnectionTimeout = borrowConnectionTimeout;
    }

    public int getLoginTimeout() {
        return loginTimeout;
    }

    public void setLoginTimeout(int loginTimeout) {
        this.loginTimeout = loginTimeout;
    }

    public int getMaintenanceInterval() {
        return maintenanceInterval;
    }

    public void setMaintenanceInterval(int maintenanceInterval) {
        this.maintenanceInterval = maintenanceInterval;
    }

    public int getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setMaxIdleTime(int maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

    public String getTestQuery() {
        return testQuery;
    }

    public void setTestQuery(String testQuery) {
        this.testQuery = testQuery;
    }

    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}

DBConfig2.java

package com.atomikos.config.pojo;

import org.springframework.boot.context.properties.ConfigurationProperties;

import lombok.Data;

@Data
/**
 * 将application.properties配置文件中配置自动封装到实体类字段中
 * @author yehaibo
 */
@ConfigurationProperties(prefix = "spring.mysql.test2")// 注意这个前缀要和application.yml文件的前缀一样
public class DBConfig2 {

    private String url;
    // 比如这个url在properties中是这样子的mysql.datasource.test1.username = root
    private String username;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getMaxLifetime() {
        return maxLifetime;
    }

    public void setMaxLifetime(int maxLifetime) {
        this.maxLifetime = maxLifetime;
    }

    public int getBorrowConnectionTimeout() {
        return borrowConnectionTimeout;
    }

    public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
        this.borrowConnectionTimeout = borrowConnectionTimeout;
    }

    public int getLoginTimeout() {
        return loginTimeout;
    }

    public void setLoginTimeout(int loginTimeout) {
        this.loginTimeout = loginTimeout;
    }

    public int getMaintenanceInterval() {
        return maintenanceInterval;
    }

    public void setMaintenanceInterval(int maintenanceInterval) {
        this.maintenanceInterval = maintenanceInterval;
    }

    public int getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setMaxIdleTime(int maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

    public String getTestQuery() {
        return testQuery;
    }

    public void setTestQuery(String testQuery) {
        this.testQuery = testQuery;
    }

    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}

TractionController.java

package com.atomikos.controller;

import com.atomikos.entity.BookDO;
import com.atomikos.entity.BookVo;
import com.atomikos.service.BookService;
import com.atomikos.service.impl.BookServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
 * 页面对接的controll类
 * @author yehaibo
 */
@RestController
@RequestMapping("/books")
public class TractionController {

    @Autowired
    private BookService bookService;

    @GetMapping
    public List<BookDO> list(
            @RequestParam(defaultValue = "1") Integer page,
            @RequestParam(defaultValue = "10") Integer size) {
        return bookService.list(page, size);
    }

    @GetMapping("/{id}")
    public BookDO get(@PathVariable Long id) {
        return bookService.get(id);
    }

    @PostMapping
    public BookDO save(@RequestBody BookVo book) {
        return bookService.save(book, book.getUser());
    }

    @PutMapping
    public BookDO update(@RequestBody BookVo book) {
        return ((BookServiceImpl) bookService).update(book, book.getUser());
    }

    @DeleteMapping("/{id}")
    public int delete(@PathVariable Long id) {
        return ((BookServiceImpl) bookService).delete(id);
    }

}

UserMapper.java

package com.atomikos.dao.db1;

import com.atomikos.entity.UserDO;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
 * mybatic有两种写法,这里是mapper的写法,不是dao的写法
 */
@Mapper
@Repository
public interface UserMapper {

    /**
     * 根据主键查询一条记录
     *
     * @param id
     * @return
     */
    @Select("select id, username, password from user where id = #{id}")
    UserDO get(Long id);

    /**
     * 分页列表查询
     *
     * @param page
     * @param size
     * @return
     */
    @Select("select id, username, password from user limit #{page}, #{size}")
    List<UserDO> list(Integer page, Integer size);

    /**
     * 保存
     *
     * @param userDO
     * @return 自增主键
     */
    @Insert("insert into user(username, password) values(#{username}, #{password})")
    @Options(useGeneratedKeys = true, keyColumn = "id")
    int save(UserDO userDO);

    /**
     * 修改一条记录
     *
     * @param user
     * @return
     */
    @Update("update user set username = #{username}, password = #{password} where id = #{id}")
    int update(UserDO user);

    /**
     * 删除一条记录
     *
     * @param id 主键
     * @return
     */
    @Delete("delete from user where id = #{id}")
    int delete(Long id);
}

BookMapper.java

package com.atomikos.dao.db2;

import com.atomikos.entity.BookDO;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
 * @author yehaibo
 * @date 2019/9/11
 */
@Mapper
@Repository
public interface BookMapper {

    /**
     * 分页查询
     *
     * @param page 页码
     * @param size 每页记录数
     * @return
     */
    @Select("select id, name, article_id as articleId, user_id as userId from book limit ${page}, ${size}")
    List<BookDO> list(@Param("page") Integer page, @Param("size") Integer size);

    /**
     * 根据主键查询单条记录
     *
     * @param id
     * @return
     */
    @Select("select id, name, article_id as articleId, user_id as userId from book where id = #{id}")
    BookDO get(Long id);

    /**
     * 添加一条记录
     *
     * @param book
     * @return 自增主键
     */
    @Insert("insert into book(name, article_id, user_id) values(#{name}, #{articleId}, #{userId})")
    @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
    int save(BookDO book);

    /**
     * 修改一条记录
     *
     * @param book
     * @return
     */
    @Update("update book set name = #{name}, article_id = #{articleId}, user_id = #{userId} where id = #{id}")
    int update(BookDO book);

    /**
     * 删除一条记录
     *
     * @param id 主键
     * @return
     */
    @Delete("delete from book where id = #{id}")
    int delete(Long id);
}

ArticleDO.java

package com.atomikos.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 文章
 *
 * @author yehaibo
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ArticleDO implements Serializable {

    private static final long serialVersionUID = 3971756585655871603L;

    private Long id;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    private String title;

    private String content;

    private String url;

}

BookDO.java

package com.atomikos.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 书
 *
 * @author yehaibo
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class BookDO implements Serializable {

    private static final long serialVersionUID = 3231762613546697469L;

    private Long id;
//
//    public BookDO(Long BookId, String Name, Long ArticleId, Long UserId) {
//        this.id = BookId;
//        this.name = Name;
//        this.articleId = ArticleId;
//        this.userId = UserId;
//    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Long getArticleId() {
        return articleId;
    }

    public void setArticleId(Long articleId) {
        this.articleId = articleId;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    private String name;

    private Long articleId;

    private Long userId;

}

BookVo.java

package com.atomikos.entity;

import lombok.Data;

/**
 * POJO的值对象
 */
@Data
public class BookVo extends BookDO {

    public UserDO getUser() {
        return user;
    }

    public void setUser(UserDO user) {
        this.user = user;
    }

    private UserDO user;
}

UserDO.java

package com.atomikos.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 用户
 *
 * @author fengxuechao
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserDO implements Serializable {

    private static final long serialVersionUID = 469663920369239035L;

    private Long id;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    private String username;

    private String password;
}

BookService.java

package com.atomikos.service;


import com.atomikos.entity.BookDO;
import com.atomikos.entity.UserDO;

import java.util.List;

/**
 * 主要目的是测试分布式事务
 *
 * @author yehaibo
 */
public interface BookService {

    /**
     * 保存
     *
     * @param book
     * @param user
     * @return
     */
    BookDO save(BookDO book, UserDO user);

    /**
     * 单条查询
     *
     * @param id
     * @return
     */
    BookDO get(Long id);

    /**
     * 分页查询
     *
     * @param page
     * @param size
     * @return
     */
    List<BookDO> list(Integer page, Integer size);

    BookDO update(BookDO book, UserDO user);

}

BookServiceImpl.java

package com.atomikos.service.impl;


import com.atomikos.dao.db1.UserMapper;
import com.atomikos.dao.db2.BookMapper;
import com.atomikos.entity.BookDO;
import com.atomikos.entity.UserDO;
import com.atomikos.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

/**
 * @author yehaibo
 */
@Service("yehaibo")
public class BookServiceImpl implements BookService {

    @Autowired    //主要是唯一(例如类型唯一匹配上了,也是可以注入的)
    private BookMapper bookMapper;

    @Autowired
    private UserMapper userMapper;

    /**
     * 保存书本和文章, 使用声明式事务(tx+aop形式)
     *
     * @param book {@link BookDO}
     * @param user {@link UserDO}
     * @return
     */
    @Override
    public BookDO save(BookDO book, UserDO user) {
        int userSave = userMapper.save(user);
        if (userSave == 0) {
            return null;
        }
        book.setUserId(user.getId());
        int bookSave = bookMapper.save(book);
        if (bookSave == 0) {
            return null;
        }
//        throw new RuntimeException("测试分布式事务(tx+aop形式)");
        return book;
    }

    /**
     * 单条查询
     *
     * @param id
     * @return
     */
    @Override
    public BookDO get(Long id) {
        BookDO book = bookMapper.get(id);
        UserDO user = userMapper.get(book.getUserId());
        //?????
        //BookDO(book.getId(), book.getName(), book.getArticleId(), user.getId())
        return book;
    }

    /**
     * 分页查询
     *
     * @param page
     * @param size
     * @return
     */
    @Override
    public List<BookDO> list(Integer page, Integer size) {
        page = (page < 1 ? 0 : page - 1) * size;
        return bookMapper.list(page, size);
    }

    /**
     * 修改书本和文章, 使用声明式事务(注解形式)
     *
     * @param book
     * @param user
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public BookDO update(BookDO book, UserDO user) {
        int bookUpdate = bookMapper.update(book);
        if (bookUpdate != 1) {
            return null;
        }
        int userUpdate = userMapper.update(user);
        if (userUpdate != 1) {
            return null;
        }
        throw new RuntimeException("测试分布式事务(注解形式)");
//        return book;
    }

    /**
     * 删除书本和文章
     *
     * @param id
     * @return
     */
    public int delete(Long id) {
        BookDO book = bookMapper.get(id);
        System.err.println(book);
        if (book == null) {
            throw new RuntimeException("没有可以删除的书本");
        }
        Long userId = book.getUserId();
        int userDelete = userMapper.delete(userId);
        if (userDelete != 1) {
            return 0;
        }
        int bookDelete = bookMapper.delete(id);
        if (bookDelete != 1) {
            return 0;
        }
        throw new RuntimeException("测试没有添加分布式事务管理)");
//        return 1;
    }

}

BookServiceImplTest.java

package com;

import com.atomikos.entity.BookDO;
import com.atomikos.entity.UserDO;
import com.atomikos.service.BookService;
import com.atomikos.service.impl.BookServiceImpl;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * 从service层发起的测试分布式事务:切面拦截形式, 注解式
 * Junit会自己启动springboot的框架进行测试的
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = com.atomikos.Application.class)            //这个必须要加的,否则Junit无法启动spring框架进行测试
public class BookServiceImplTest {

    @Autowired
    @Qualifier("yehaibo")
    BookService bookService;

    /**
     * 测试分布式事务(切面拦截形式)
     */
    @Test
    public void save() {
        BookDO book = new BookDO();
        book.setName("测试封面名称 - 001");
        book.setArticleId(69L);

        UserDO user = new UserDO();
        user.setUsername("用户名 - 001");
        user.setPassword("密码 - 001");
        BookDO bookDO = bookService.save(book, user);
        System.out.println(bookDO);
    }

    /**
     * 测试分布式事务(注解式)
     */
    @Test
    public void update() {
        BookDO book = new BookDO();
        book.setId(10L);
        book.setName("测试封面名称 - 002");
        book.setArticleId(69L);

        UserDO user = new UserDO();
        user.setId(18L);
        user.setUsername("月用户名 - 002");
        user.setPassword("密码 - 002");

        //((BookServiceImpl)bookService).update(book, user);  //已经明确指定了,不需要这样转换了
        bookService.update(book, user);
    }

    /**
     * 没有事务管理
     */
    @Test
    public void delete() {
        int delete = ((BookServiceImpl) bookService).delete(12L);
        Assert.assertEquals(1, delete);
    }
}

TractionControllerTest.java

package com;

import com.atomikos.dao.db1.UserMapper;
import com.atomikos.entity.BookVo;
import com.atomikos.entity.UserDO;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;

import static org.hamcrest.Matchers.is;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

/**
 * 从controller层发起的mock测试分布式事务
 * 有了这种方式,就不需要postman了
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = com.atomikos.Application.class)            //这个必须要加的,否则Junit无法启动spring框架进行测试
public class TractionControllerTest {

    private MockMvc mockMvc;

    private ObjectMapper objectMapper = new ObjectMapper();

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private WebApplicationContext context;

    @Before
    public void setUp() {
        this.mockMvc = MockMvcBuilders.webAppContextSetup(this.context).build();
    }

    /**
     * 申明式
     *
     * @throws Exception
     */
    @Test
    public void save() throws Exception {
        UserDO user = new UserDO();
        user.setUsername("用户名 - 002");
        user.setPassword("密码 - 002");

        BookVo book = new BookVo();
        book.setName("书本名称 - 002");
        book.setArticleId(69L);
        book.setUser(user);
        String json = objectMapper.writeValueAsString(book);
        this.mockMvc.perform(
                post("/books")
                        .contentType(MediaType.APPLICATION_JSON_UTF8)
                        .content(json))
                .andExpect(status().isOk())
                .andExpect(jsonPath("$.name", is("测试封面名称 - 002")))
                .andExpect(jsonPath("$.articleId", is(69)))
                .andDo(print());
    }

    /**
     * 注解式
     *
     * @throws Exception
     */
    @Test
    public void update() throws Exception {
        UserDO user = userMapper.get(3L);
        assert user != null;
        user.setUsername("用户名- 003");
        user.setPassword("密码 - 003");

        BookVo book = new BookVo();
        book.setId(9L);
        book.setName("测试封面名称 - 003");
        book.setArticleId(69L);
        book.setUser(user);

        String json = objectMapper.writeValueAsString(book);
        this.mockMvc.perform(
                put("/books")
                        .contentType(MediaType.APPLICATION_JSON_UTF8)
                        .content(json))
                .andExpect(status().isOk())
                .andExpect(jsonPath("$.name", is("测试封面名称 - 003")))
                .andExpect(jsonPath("$.articleId", is(87)))
                .andDo(print());
    }

    /**
     * 没有事务管理
     *
     * @throws Exception
     */
    @Test
    public void delete() throws Exception {
        this.mockMvc.perform(
                MockMvcRequestBuilders.delete("/books/4"))
                .andExpect(status().isOk())
                .andDo(print());
    }
}
上一篇:hadoop日常运维白皮书


下一篇:Spark拉取Kafka的流数据,转插入HBase中