@Transactional事务中发送MQ消息,事务还未完成但是消息已经发送
这种情况会导致一些问题:
1.事务还未提交,但是消息已经发送,这个时候消息中的一些信息提供给别人调用,但是别人调用并没有在数据库找到该记录(因为事务还未提交)
2.事务还未提交,但是成功的消息已经通知了别人,这时候事务因为某些原因出错回滚,但是别人却已经收到了成功的通知
解决办法:
事务提交后再发送消息
@Component
public class AmqpTemplateHelper {
@Autowired
private AmqpTemplate amqpTemplate;
public <T> void send(String queue, T message) {
// 是否开启事务判断
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
amqpTemplate.convertAndSend(queue, message);
}
});
} else {
amqpTemplate.convertAndSend(queue, message);
}
}
}
调用
@Autowired
private AmqpTemplateHelper amqpTemplateHelper;
@Transactional
public void save() {
Person person = new Person();
String name = "evan";
person.setName(name);
baseMapper.insert(person);
amqpTemplateHelper.send("QUEUE_NAME", name);
}
分析:
// 注册事务同步处理
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 事务提交完毕时,触发:yourFunction
yourFunction();
}
原理:
提交一个事务同步处理,在事务commit之后执行,具体存放在threadLocal(线程本地变量)中,事务commit时会去threadLocal里边取。源码afterCommit是空的,没有任何操作,可见是spring专门预留给大家使用的。
源码:
TransactionSynchronizationAdapter是一个接口适配器,这样不用实现接口的全部方法,按需Override即可。类图如下图所示:
TransactionSynchronizationAdapter实现了2个接口:
TransactionSynchronization事务同步接口,
Ordered执行优先级
我们这里就是实现了TransactionSynchronization接口的afterCommit()方法,最终在事务commit提交后执行。
关于spring事务执行过程图: