@Transactional事务中发送MQ消息,事务还未完成但是消息已经发送

@Transactional事务中发送MQ消息,事务还未完成但是消息已经发送
这种情况会导致一些问题:
1.事务还未提交,但是消息已经发送,这个时候消息中的一些信息提供给别人调用,但是别人调用并没有在数据库找到该记录(因为事务还未提交)
2.事务还未提交,但是成功的消息已经通知了别人,这时候事务因为某些原因出错回滚,但是别人却已经收到了成功的通知
解决办法:
事务提交后再发送消息


@Component
public class AmqpTemplateHelper {
 
    @Autowired
    private AmqpTemplate amqpTemplate;
 
    public <T> void send(String queue, T message) {
        // 是否开启事务判断
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    amqpTemplate.convertAndSend(queue, message);
                }
            });
        } else {
            amqpTemplate.convertAndSend(queue, message);
        }
    }
}

调用


	@Autowired
    private AmqpTemplateHelper amqpTemplateHelper;
 
    @Transactional
    public void save() {
        Person person = new Person();
        String name = "evan";
        person.setName(name);
        baseMapper.insert(person);
        amqpTemplateHelper.send("QUEUE_NAME", name);
    }

分析:

// 注册事务同步处理
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
    // 事务提交完毕时,触发:yourFunction
    yourFunction();
}

原理:
提交一个事务同步处理,在事务commit之后执行,具体存放在threadLocal(线程本地变量)中,事务commit时会去threadLocal里边取。源码afterCommit是空的,没有任何操作,可见是spring专门预留给大家使用的。

源码:

TransactionSynchronizationAdapter是一个接口适配器,这样不用实现接口的全部方法,按需Override即可。类图如下图所示:
@Transactional事务中发送MQ消息,事务还未完成但是消息已经发送

TransactionSynchronizationAdapter实现了2个接口:

TransactionSynchronization事务同步接口,
Ordered执行优先级
我们这里就是实现了TransactionSynchronization接口的afterCommit()方法,最终在事务commit提交后执行。

关于spring事务执行过程图:
@Transactional事务中发送MQ消息,事务还未完成但是消息已经发送

上一篇:每日一博 - 常见的Spring事务失效&事物不回滚案例集锦


下一篇:Spring系列之事物是如何管理的