目录
1、事务场景
场景:工行用户A向建行用户B转账1万元;
步骤:
- 1.工行系统发送一个给B增款1万元的同步消息M给Broker
- 2.消息被Broker成功接收后,向工行系统发送成功ACK
- 3.工行系统收到成功ACK后从用户A中扣款1万元
- 4.建行系统从Broker中获取到消息M
- 5.建行系统消费消息M,即向用户B中增加1万元
这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费;此时建行系统中用户B会增加了1万元,出现了数据不一致问题。
2、解决思路
解决思路: 让上述步骤的第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息,简单来说就是先预提交一个消息,当本地事务没有异常的情况下,再次发送一个确认消息;该思路即使用事务消息,这里要使用分布式事务解决方案;
使用事务消息来处理该需求场景过程:
-
事务管理器TM向事务协调器TC发起指令,开启全局事务
-
工行系统发一个给B增款1万元的事务消息M给TC
-
TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker,此时的建行系统是看不到Broker中的消息M的
-
Broker会将预提交执行结果Report给TC。
-
如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调操作,去完成工行用户A的预扣款1万元的操作
-
工行系统会向TC发送预扣款执行结果,即本地事务的执行状态
-
TC收到预扣款执行结果后,会将结果上报给TM
-
TM会根据上报结果向TC发出不同的确认指令
- 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
- 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
- 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
-
TC在接收到指令后会向Broker与工行系统发出确认指令
- TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认;
- TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚;
3、事务消息概念
RocketMQ的事务消息,主要是通过消息的异步处理,可以保证本地事务和消息发送同时成功执行或失败,从而保证数据的最终一致性,具体流程如下:
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
- RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
- RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。
事务过程:
通过sendMessageInTransaction
方法将消息发送到broker
,并回调事务监听器的方法,此时消息为半消息状态,需要进行二次确认才能发送到队列并由消费者进行消费。
如果MQ收到的事务状态一直是UNKNOWN,那么将不断的向MQ发送方发起回调检查本地事务状态
,直到收Commit/Rollback状态的消息或者人工干预删除UNKNOWN状态的消息;
注意本地SQL事务一定要在MQ监听器的回调方法中执行
4、事务消息实现
4.1 代码说明:
核心类为TxConsumer
、TxProducer
、TxProducerListener
实现流程。TxProducer
调用sendMessageInTransaction
方法后进入到TxProducerListener
中
注意本地SQL事务一定要在MQ监听器的回调方法中执行
需要注意的是: 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的Producer
,需要通过注解@ExtRocketMQTemplateConfiguration
定义不同的RocketMQTemplate, ,比如:
// 定义RocketMQTemplate
@ExtRocketMQTemplateConfiguration
public class XXXRocketMQTemplate extends RocketMQTemplate {
}
// 消费者发送事务消息
// 引入自定义的RocketMQTemplate
@Resource(name = "xxxRocketMQTemplate")
RocketMQTemplate rocketMQTemplate;
public void send() {
Message<String> message = MessageBuilder.withPayload("text").build();
rocketMQTemplate.sendMessageInTransaction("topic", message, null);
}
要使用事务消息,需要自定义消息监听器,并且与RocketMQTemplate
进行绑定:
// 监听器,默认的为rocketMQTemplate
// 如果使用ExtRocketMQTemplateConfiguration是自定义了RocketMQTemplate,则需要绑定自定义的bean名称
@RocketMQTransactionListener(rocketMQTemplateBeanName="txRocketMQTemplate")
public class TxProducerListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
.....
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
.....
}
}
4.2 完整代码
TxConsumer:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "tx_topic", consumerGroup = "tx_group")
@Slf4j
public class TxConsumer implements RocketMQListener<String> {
/**
*
* @param message
*/
@Override
public void onMessage(String message) {
log.info("消息事务-接受到消息:" + message);
}
}
TxRocketMQTemplate:
// 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的`Producer`
// 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate
@ExtRocketMQTemplateConfiguration
public class TxRocketMQTemplate extends RocketMQTemplate {
}
TxProducerListener:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@RocketMQTransactionListener(rocketMQTemplateBeanName = "txRocketMQTemplate")
public class TxProducerListener implements RocketMQLocalTransactionListener {
/**
* 记录各个事务Id的状态:1-正在执行,2-执行成功,3-执行失败
*/
private ConcurrentHashMap<String, Integer> transMap = new ConcurrentHashMap<>();
/**
* 执行本地事务
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
String transId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID).toString();
log.info("消息事务id为:" + transId);
// 状态为正在执行
transMap.put(transId, 1);
try {
// 本地SQL事务一定要在MQ监听器的回调方法中执行
log.info("正在执行本地事务");
// 模拟耗时操作估计出发mq回查操作:当RocketMQ长时间(1分钟)没有收到本地事务的返回结果
// TimeUnit.SECONDS.sleep(80);
// 模拟业代码执行,比如模拟插入user数据到数据库中,并且失败的情况
// System.out.println(1 / 0);
log.info("事务执行完成.");
} catch (Exception e) {
// 状态为执行失败
transMap.put(transId, 3);
log.error("事务执行异常.");
// 出现异常
// 如果不需要重试 则设置为:ROLLBACK
// 如果需要检查事务重试,1分钟后发起检查 则设置为:UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
// 状态为执行成功
transMap.put(transId, 2);
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 事务超时,回查方法
* 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果,则会定时主动执行改方法,查询本地事务执行情况。
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
//根据transaction的id回查该事务的状态,并返回给消息队列
//未知状态:查询事务状态,但始终无结果,或者由于网络原因发送不成功,对mq来说都是未知状态
//正确提交返回LocalTransactionState.COMMIT_MESSAGE
//事务执行失败返回LocalTransactionState.ROLLBACK_MESSAGE
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
Integer status = transMap.get(transId);
// 执行状态 1-正在执行,2-执行成功,3-执行失败
log.info("回查的事务id为:" + transId + ",当前的状态为:" + status);
//正在执行
if (status == 1) {
log.info("回查结果为:正在执行状态");
return RocketMQLocalTransactionState.UNKNOWN;
} else if (status == 2) {
//执行成功,返回commit
log.info("回查结果为:成功状态");
transMap.remove(transId);
return RocketMQLocalTransactionState.COMMIT;
} else if (status == 3) {
//执行失败,返回rollback
log.info("回查结果为:失败状态");
return RocketMQLocalTransactionState.ROLLBACK;
// 通过伪代码表示 检查本地事务执行情况
// User user = selectByUserId(userId);
// if (user!=null) {
// //插入成功(本地事务完成)
// transMap.remove(transId);
// return RocketMQLocalTransactionState.COMMIT;
// } else {
// // 插入失败
// // 如果不需要再重试 则设置为:ROLLBACK
// // 如果还需要检查事务重试 则设置为:UNKNOWN
// return RocketMQLocalTransactionState.UNKNOWN;
// }
}
// 其他未知情况,统一返回不重试,删除消息
transMap.remove(transId);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
TxProducer:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.UUID;
@Service
@Slf4j
public class TxProducer {
/**
* 一个RocketMQTemplate只能注册一个事务监听器,
* 如果存在多个事务监听器监听不同的`Producer`,
* 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate
*/
@Resource(name = "txRocketMQTemplate")
RocketMQTemplate rocketMQTemplate;
public void tx() {
String text = "消息事务发送" + System.currentTimeMillis();
log.info(text);
UUID transactionId = UUID.randomUUID();
log.info("事务ID:" + transactionId);
Message<String> message = MessageBuilder.withPayload(text)
// 设置事务Id
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build();
// 调研sendMessageInTransaction后进行到监听器中
rocketMQTemplate.sendMessageInTransaction("tx_topic", message, null);
log.info("已发送...");
}
}