多数据源这是小小本周的第二篇,本篇将会着重讲解关于SpringBoot + MyBatis 多数据源的事物的问题。
此处模拟创建订单和扣减库存。先创建订单表和库存表
CREATE TABLE `t_storage` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT '0',
PRIMARY KEY (`id`),
UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
CREATE TABLE `t_order` (
`id` bigint(16) NOT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT '0',
`amount` double(14,2) DEFAULT '0.00',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
数据库连接
这里使用yal文件把数据库进行配置
spring:
datasource:
ds1:
jdbc_url: jdbc:mysql://127.0.0.1:3306/db1
username: root
password: root
ds2:
jdbc_url: jdbc:mysql://127.0.0.1:3306/db2
username: root
password: root
基本的目录结构
切换数据源的抽象类
这里切换数据源的抽象类为 AbstractRoutingDataSource 这里看一下相关的源码
public abstract class AbstractRoutingDataSource{
//数据源的集合
@Nullable
private Map<Object, Object> targetDataSources;
//默认的数据源
@Nullable
private Object defaultTargetDataSource;
//返回当前的路由键,根据该值返回不同的数据源
@Nullable
protected abstract Object determineCurrentLookupKey();
//确定一个数据源
protected DataSource determineTargetDataSource() {
//抽象方法 返回一个路由键
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.targetDataSources.get(lookupKey);
return dataSource;
}
}
对于该源码来说,核心为用Map保存多个数据源信息,根据key获取不同的数据源。
修改数据源的核心
那么这里修改数据源的核心就在于重写determineCurrentLookupKey方法,让其返回一个数据源名称
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();
return dataBaseType;
}
}
此时的目录结构为
再添加一个工具类
这里再添加一个工具类,用来保存当前线程的数据源类型
public class DataSourceType {
public enum DataBaseType {
ds1, ds2
}
// 使用ThreadLocal保证线程安全
private static final ThreadLocal<DataBaseType> TYPE = new ThreadLocal<DataBaseType>();
// 往当前线程里设置数据源类型
public static void setDataBaseType(DataBaseType dataBaseType) {
if (dataBaseType == null) {
throw new NullPointerException();
}
TYPE.set(dataBaseType);
}
// 获取数据源类型
public static DataBaseType getDataBaseType() {
DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();
return dataBaseType;
}
}
此时目录结构为
实现数据源的注入
@Configuration
public class DataSourceConfig {
/**
* 创建多个数据源 ds1 和 ds2
* 此处的Primary,是设置一个Bean的优先级
* @return
*/
@Primary
@Bean(name = "ds1")
@ConfigurationProperties(prefix = "spring.datasource.ds1")
public DataSource getDateSource1() {
return DataSourceBuilder.create().build();
}
@Bean(name = "ds2")
@ConfigurationProperties(prefix = "spring.datasource.ds2")
public DataSource getDateSource2() {
return DataSourceBuilder.create().build();
}
/**
* 将多个数据源注入到DynamicDataSource
* @param dataSource1
* @param dataSource2
* @return
*/
@Bean(name = "dynamicDataSource")
public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,
@Qualifier("ds2") DataSource dataSource2) {
Map<Object, Object> targetDataSource = new HashMap<>();
targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);
targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);
DynamicDataSource dataSource = new DynamicDataSource();
dataSource.setTargetDataSources(targetDataSource);
dataSource.setDefaultTargetDataSource(dataSource1);
return dataSource;
}
/**
* 将动态数据源注入到SqlSessionFactory
* @param dynamicDataSource
* @return
* @throws Exception
*/
@Bean(name = "SqlSessionFactory")
public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dynamicDataSource);
bean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));
bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");
return bean.getObject();
}
}
- 创建多个数据源DataSource,ds1 和 ds2;
- 将ds1 和 ds2 数据源放入动态数据源DynamicDataSource;
- 将DynamicDataSource注入到SqlSessionFactory。
设置路由键
创建相关的mapper接口
public interface OrderMapper {
void createOrder(Order order);
}
public interface StorageMapper {
void decreaseStorage(Order order);
}
设置切面
这里设置相关的切面
@Component
@Aspect
public class DataSourceAop {
@Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")
public void setDataSource1() {
DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);
}
@Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")
public void setDataSource2() {
DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);
}
}
在执行订单的操作时,切到数据源ds1,执行库存操作时,切到数据源ds2。
测试
public class OrderServiceImpl implements OrderService {
@Override
public void createOrder(Order order) {
storageMapper.decreaseStorage(order);
logger.info("库存已扣减,商品代码:{},购买数量:{}。创建订单中...",order.getCommodityCode(),order.getCount());
orderMapper.createOrder(order);
}
}
总目录结构
经过测试两个数据表都已经发生了变化
事物模式这里事物模式,不能切换数据源,可以在,service上添加Transactional上实现。
为什么
这里为什么不能切换数据源 首先查看相关的注解
public class TransactionInterceptor{
public Object invoke(MethodInvocation invocation) throws Throwable {
//获取目标类
Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());
//事务调用
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
创建事物
这里首先执行的是创建事物
protected Object doGetTransaction() {
//DataSource的事务对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
//设置事务自动保存
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//给事务对象设置ConnectionHolder
ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
重点是给事务对象设置了ConnectionHolder属性,不过此时还是为空。
开启事物
这里主要是通过ThreadLocal将资源和当前的事务对象绑定,然后设置一些事务状态。
protected void doBegin(Object txObject, TransactionDefinition definition) {
Connection con = null;
//从数据源中获取一个连接
Connection newCon = obtainDataSource().getConnection();
//重新设置事务对象中的connectionHolder,此时已经引用了一个连接
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
//将这个connectionHolder标记为与事务同步
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
con.setAutoCommit(false);
//激活事务活动状态
txObject.getConnectionHolder().setTransactionActive(true);
//将connection holder绑定到当前线程,通过threadlocal
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
//事务管理器,激活事务同步状态
TransactionSynchronizationManager.initSynchronization();
}
执行Mapper接口
这里执行mapper接口
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
//从ThreadLocal中获取SqlSessionHolder,第一次获取不到为空
SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory);
//如果SqlSessionHolder为空,那也肯定获取不到SqlSession;
//如果SqlSessionHolder不为空,直接通过它来拿到SqlSession
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
//创建一个新的SqlSession
session = sessionFactory.openSession(executorType);
//如果当前线程的事务处于激活状态,就将SqlSessionHolder绑定到ThreadLocal
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
执行SQL之前,会先获取到SqlSession对象。
拿到SqlSession之后,就开始调用Mybatis的执行器,准备执行SQL语句。在执行SQL之前呢,当然需要先拿到Connection连接。
public Connection getConnection() throws SQLException {
//通过数据源获取连接
//比如我们配置了多数据源,此时还会正常切换
if (this.connection == null) {
openConnection();
}
return this.connection;
}
至此可以看openConnection方法,作用是获取一个连接,如果我们配置了多数据源,此时可以切换,如果添加事物,此时不能切换,因为多了if判断,此时返回的还是上一次保存的链接。
事物模式,切换数据源两道小题 1: Spring 如何保证事物:把业务放在同一个数据库连接中,一起提交,一起回滚。2: 这枚做到在一个连接中,在线程中,通过数据库资源和当前事物相绑定实现。
创建SqlSessionFactory
在文件中,添加,如下的两个方法
@Bean(name = "sqlSessionFactory1")
public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){
return createSqlSessionFactory(dataSource);
}
@Bean(name = "sqlSessionFactory2")
public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){
return createSqlSessionFactory(dataSource);
}
这里再添加 CustomSqlSessionTemplate 用来代替原来的SqlSessionTemplate,把SqlSessionFactory注入 在。同一个文件中添加该方法
@Bean(name = "sqlSessionTemplate")
public CustomSqlSessionTemplate sqlSessionTemplate(){
Map<Object,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
sqlSessionFactoryMap.put("ds1",factory1);
sqlSessionFactoryMap.put("ds2",factory2);
CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);
customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);
return customSqlSessionTemplate;
}
补充上自定义的CustomSqlSessionTemplate
public class CustomSqlSessionTemplate extends SqlSessionTemplate {
@Override
public SqlSessionFactory getSqlSessionFactory() {
//当前数据源的名称
String currentDsName = DataSourceType.getDataBaseType().name();
SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);
if (targetSqlSessionFactory != null) {
return targetSqlSessionFactory;
} else if (defaultTargetSqlSessionFactory != null) {
return defaultTargetSqlSessionFactory;
}
return this.sqlSessionFactory;
}
}
核心在于根据是否为空,获取到sqlsessionfactory
if (targetSqlSessionFactory != null) {
return targetSqlSessionFactory;
} else if (defaultTargetSqlSessionFactory != null) {
return defaultTargetSqlSessionFactory;
}
测试
修改完配置之后,我们把Service方法加上事务的注解,此时数据也是可以正常更新的。
@TransactionalXA协议分布式事务
@Override
public void createOrder(Order order) {
storageMapper.decreaseStorage(order);
orderMapper.createOrder(order);
}
这里借助 Atomikos 框架
引入maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
更改getDataSource
把DataSource对象改成AtomikosDataSourceBean。
public DataSource getDataSource(Environment env, String prefix, String dataSourceName){测试
Properties prop = build(env,prefix);
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());
ds.setUniqueResourceName(dataSourceName);
ds.setXaProperties(prop);
return ds;
}
这样配完之后,获取Connection连接的时候,拿到的其实是MysqlXAConnection对象。在提交或者回滚的时候,走的就是MySQL的XA协议了。
public void commit(Xid xid, boolean onePhase) throws XAException {
//封装 XA COMMIT 请求
StringBuilder commandBuf = new StringBuilder(300);
commandBuf.append("XA COMMIT ");
appendXid(commandBuf, xid);
try {
//交给MySQL执行XA事务操作
dispatchCommand(commandBuf.toString());
} finally {
this.underlyingConnection.setInGlobalTx(false);
}
}