最近在工作中使用Spring的事务踩过一些坑,抽象成一些场景,记录一下。
首先创建测试表,用于后续的演示。同时约定process_flag枚举值,0表示未处理,1表示已处理。
1 CREATE TABLE test_record ( 2 id VARCHAR (16) PRIMARY KEY, 3 process_flag INT (2) 4 );
案例1. 在事务开启后,中途修改事务的隔离级别,不生效。
插入如下数据。
1 INSERT INTO test_record (id, process_flag) 2 VALUES 3 ('1', 0), 4 ('2', 0);
为了不让MyBatis的一级缓存影响结果,将其关闭。
数据库事务隔离级别是RR.
创建几个服务类。
1 @Service 2 @Slf4j 3 public class TotalService { 4 5 @Autowired 6 Service1 service1; 7 8 @Autowired 9 Service2 service2; 10 11 @Transactional 12 public int process() { 13 int totalAffectedRows = 0; 14 while (true) { 15 List<TestRecord> unprocessed = service1.getUnprocessed(); 16 17 if (CollectionUtils.isEmpty(unprocessed)) { 18 break; 19 } 20 log.info("Unprocessed size: {}", unprocessed.size()); 21 22 totalAffectedRows += service2.updateProcessFlag(unprocessed.stream().map(TestRecord::getId).collect(Collectors.toList())); 23 } 24 25 return totalAffectedRows; 26 } 27 }
1 @Service 2 public class Service1 { 3 4 @Autowired 5 private TestRecordMapper testRecordMapper; 6 7 public List<TestRecord> getUnprocessed() { 8 return testRecordMapper.getUnprocessed(10); 9 } 10 }
1 @Service 2 public class Service2 { 3 4 @Autowired 5 private TestRecordMapper testRecordMapper; 6 7 @Transactional(propagation = Propagation.REQUIRES_NEW) 8 public int updateProcessFlag(List<String> ids) { 9 return testRecordMapper.updateProcessFlag(ids, 1); 10 } 11 }
创建Mapper.
1 public interface TestRecordMapper { 2 3 List<TestRecord> getUnprocessed(@Param("limit") int limit); 4 5 int updateProcessFlag(@Param("ids") List<String> ids, @Param("processFlag") int processFlag); 6 }
逻辑很简单,就是有一个大事务在最外层A,里面分批从数据库捞取未处理的数据,处理(本例子没有)之后,新开一个事务B更新状态(不能和外层事务在一起,防止事务超大)。
在数据库级别为RR的情况下,这个demo将会死循环,原因是可重复读,A事务无法感知B事务修改了状态,在它看来数据库里的数据都是未处理的。
那么,如果修改捞取数据的方法,让它变成RC,不就可以实时读到数据库的最新状态了吗?就像这样。
1 @Service 2 public class Service1 { 3 4 @Autowired 5 private TestRecordMapper testRecordMapper; 6 7 @Transactional(isolation = Isolation.READ_COMMITTED) 8 public List<TestRecord> getUnprocessed() { 9 return testRecordMapper.getUnprocessed(10); 10 } 11 }
答案是……不行。
原因是在事务开启之后,是不能中途修改事务隔离级别的。必须在事务开启的时候就指定,因此,需要在外层TotalService指定。
1 @Service 2 @Slf4j 3 public class TotalService { 4 5 @Autowired 6 Service1 service1; 7 8 @Autowired 9 Service2 service2; 10 11 @Transactional(isolation = Isolation.READ_COMMITTED) 12 public int process() { 13 int totalAffectedRows = 0; 14 while (true) { 15 List<TestRecord> unprocessed = service1.getUnprocessed(); 16 17 if (CollectionUtils.isEmpty(unprocessed)) { 18 break; 19 } 20 log.info("Unprocessed size: {}", unprocessed.size()); 21 22 totalAffectedRows += service2.updateProcessFlag(unprocessed.stream().map(TestRecord::getId).collect(Collectors.toList())); 23 } 24 25 return totalAffectedRows; 26 } 27 }
有人就会说了,很明显就应该加在这里啊,为什么会想到加在Service1的方法上呢?这是因为工作中的场景可比这个复杂多了呀,哪能一眼看穿……
为什么中途不能修改呢?我在*上找到了一个答案 https://*.com/questions/4940648/how-to-start-a-transaction-in-jdbc?noredirect=1&lq=1
还有一个问题,如果是直接和MySql交互,不是使用JDBC呢?我看网上说,修改会话的事务隔离级别,会立即生效。但我在客户端试了一下,虽然级别是改了,但仍然看不到别的事务的修改……
算了,不纠结了,反正知道JDBC不支持就可以了。
案例2.org.springframework.transaction.support.TransactionSynchronizationManager类的两个方法,isSynchronizationActive和isActualTransactionActive方法的差别
对上面例子稍作修改。在TotalService的外层,加一个调用,以RC开启一个事务。
1 @Service 2 public class OutterService { 3 4 @Autowired 5 private TotalService totalService; 6 7 @Transactional(isolation = Isolation.READ_COMMITTED) 8 public int process() { 9 return totalService.process(); 10 } 11 }
对于TotalService的实现也稍作修改,将其声明为RC级别。Service1继承TotalService的事务,不单独声明。
1 @Service 2 @Slf4j 3 public class TotalService { 4 5 @Autowired 6 Service1 service1; 7 8 @Autowired 9 Service2 service2; 10 11 @Transactional(propagation = Propagation.NOT_SUPPORTED) 12 public int process() { 13 int totalAffectedRows = 0; 14 while (true) { 15 List<TestRecord> unprocessed = service1.getUnprocessed(); 16 17 if (CollectionUtils.isEmpty(unprocessed)) { 18 break; 19 } 20 log.info("Unprocessed size: {}", unprocessed.size()); 21 22 totalAffectedRows += service2.updateProcessFlag(unprocessed.stream().map(TestRecord::getId).collect(Collectors.toList())); 23 } 24 25 return totalAffectedRows; 26 } 27 }
同时,一定要把MyBatis一级缓存的开关打开,才能复现问题。
运行之后,同样是死循环,不断地从数据库读取出相同的数据。
相信大家也能猜出来,这是MyBatis一级缓存的问题,走到了缓存里,根本没有去查数据库。
一级缓存是SqlSession级别,那也就是说,MyBatis在每次查询时,都使用了同一个SqlSession。
这就很奇怪了,明明已经声明是NOT_SUPPORTED,不支持事务,为什么还会这样?在我的认知里,MyBatis一般是事务才会使用同一个SqlSession,否则每次都是新创建的。
实则不然,在org.mybatis.spring.SqlSessionUtils#registerSessionHolder 方法中,其判断是否要将会话绑定到线程资源,使用的方法是org.springframework.transaction.support.TransactionSynchronizationManager#isSynchronizationActive,该方法并不意味着当前是在事务中,它只是告诉你是否开启了同步资源。
如果要知道当前是否在事务,需要使用另一个方法org.springframework.transaction.support.TransactionSynchronizationManager#isActualTransactionActive。
这两个方法的差别,我同样是在*上找到的答案,https://*.com/questions/18771296/spring-transactions-transactionsynchronizationmanager-isactualtransactionactive
可以看到,在NOT_SUPPORTED的传播中,确实是挂起了之前的事务,当前无事务,但同步资源仍然是支持的。
同步资源的判断是,只要不为null就表示支持。而NOT_SUPPORTED虽然是不支持事务,实际上仍可以看作一个空事务,其初始化的时候,会给这个变量赋值,虽然是一个空集合。
总的来说,把NOT_SUPPORTED看作一个空事务,它会把之前的事务挂起。与数据库交互时,已经是auto commit = true; 即不在事务中。 但其仍然是支持同步资源的。
MyBatis基于这个判断,而不是基于当前是否在事务中,对SqlSession进行了资源的同步,在同一个空事务中,使用的是同一个SqlSession. 此时如果一级缓存没有关闭,则可能受其影响,不会访问数据库,直接就返回了缓存结果。
也可以说,Spring已经保证了其事务传播的正确性,但MyBatis多此一举了……
最简单的解决方法,就是把一级缓存关闭,这样就算使用了同一个会话,也不会被影响。同时,最外层的OutterService,并不需要设置为RC。原因是在NOT_SUPPORTED的这个空事务中,数据库连接已经是自动提交了(见上图),说明不在事务里面,它总是能读到最新的。
所以以后在分析事务的传播机制过程中,一定要把与数据库的连接行为(auto commit)和资源是否同步这两个概念区分开。而针对MyBatis缓存的场景,一定要将其关闭。其他的,只要考虑数据库连接行为即可!因为框架包装到最后,执行的仍然是JDBC的实现。
关于Spring中事务传播的行为,可以参考这两篇博客:
https://blog.csdn.net/kangsa998/article/details/104932913
https://www.cnblogs.com/micrari/p/7612962.html
另外,最好关闭MyBatis的缓存,因为其行为不受开发者控制,容易出现各种奇怪的结果,将其作为一个普通ORM框架即可。可以参考:https://tech.meituan.com/2018/01/19/mybatis-cache.html
最后,这是最终可运行的示例代码。
链接: https://pan.baidu.com/s/1gBmvlbsyIIUn7lS71vkpvw?pwd=uf7g 提取码: uf7g