一、概述
分布式事务消息是RocketMQ的特有功能。在很多场景不求事务的强一致性,只需达到事务的最终一致性。此时,事务消息可以很好的满足需求。
通过将本地事务与消息的发送放在一个本地事务中,保证本地事务执行成功时,消息一定被成功投递到消息服务器中,最终利用消息中间件的高可靠性,保证消息会被下游业务所消费。
基本概念
分布式事务
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。
事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务的解决方案,一种分步式事务处理模式。
半事务消息
暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。
本地事务状态
Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令。
package org.apache.rocketmq.client.producer
/* 描述本地事务执行状态 */
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}
消息回查
消息回查,即重新查询本地事务的执行状态。一般就是重新到DB中查看预处理操作是否执行成功。
注意,消息回查不是重新执行回调操作。回调操作是进行 预处理操作,而消息回查则是查看预处理操作的执行结果。
引发消息回查最常见的原因有两个:
1)回调操作返回UNKNOW
2)TC没有接收到TM的最终全局事务确认指令(TM与TC通过网络交互,只要出现网络抖动就有出现超时问题的可能性)
RocketMQ中的消息回查设置
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒。
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
注意
- 事务消息不支持延时消息
- 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)
二、示例
场景
这里的一个需求场景是:工行用户A向建行用户B转账1万元。
我们可以使用同步消息来处理该需求场景:
- 工行系统发送一个给B赠款1万元的同步消息M给Broker
- 消息被Broker成功接收后,向工行系统发送成功ACK
- 工行系统收到成功ACK后从用户A中扣款1万元
- 建行系统从Broker中获取到消息M
- 建行系统消费消息M,即向用户中增加1万元
其中存在问题:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出现了数据不一致问题。
解决思路 :让1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即 使用事务消息
。这里要使用分布式事务
解决方案。
这里要使用到事务消息
对业务进行处理:
本例中的消息回查:
结构
引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
消息头 RocketMQHeaders 常量使用
RocketMQHeaders类 为我们提供了常用的系统常量,我们可以用其替代一些Header的Key,比如:RocketMQHeaders.KEYS = “KEYS”,我们直接使用 RocketMQHeaders.KEYS 即可。
常用Header参数:
参数 | 说明 |
---|---|
KEYS | KEYS |
TRANSACTION_ID | 事务ID |
MESSAGE_ID | 消息ID |
QUEUE_ID | 消息Queue队列ID |
TAGS | 消息Tag标签 |
TOPIC | 消息Topic主题 |
代码
生产者 : 模拟用户发起转账请求
/*
* 【生产者】模拟用户发起转账请求
*/
@Slf4j
@Service
public class TransactionProducerService {
// TOPIC名称
private static final String TOPIC = "transTopic";
// TAG信息
private static final String TAG = "transTag";
@Autowired
private RocketMQTemplate rocketMQTemplate;
public TransactionSendResult sendHalfMsg(String msg){
// 生成事务ID
String transactionId = UUID.randomUUID().toString().replace("-","");
log.info("【发送半消息】transactionId={}", transactionId);
String transKeys = "transKey";
// 发送事务消息
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
TOPIC + ":" + TAG,
MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader(RocketMQHeaders.KEYS,transKeys) // 相比于使用"KEYS",使用封装常量更不易出错
.build(),
msg
);
log.info("【发送半消息】sendResult={}",msg);
return sendResult;
}
}
监听器(本地事务) : 模拟工行进行扣款活动
/*
* 【事务监听器(本地事务)】模拟工行进行扣款活动
*/
@Slf4j
@RocketMQTransactionListener()
public class ICBCTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders messageHeaders = msg.getHeaders();
String transactionId = (String) messageHeaders.get(RocketMQHeaders.TRANSACTION_ID);
log.info("预提交消息成功:{}",msg);
log.info("【执行本地事务】消息体参数:transactionId={}", transactionId);
try {
StringBuilder money = new StringBuilder();
byte[] bytes = ((byte[])msg.getPayload());
for (int i = 0; i < bytes.length; i++) {
money.append(bytes[i] - '0');
}
log.info("【执行本地事务成功】工行账户扣除" + money +"元!");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID, String.class);
log.info("执行消息回查:{}",msg);
log.info("【回查本地事务】transactionId={}",transactionId);
// 执行相关业务
// if(...) {
// return RocketMQLocalTransactionState.ROLLBACK;
// else {
return RocketMQLocalTransactionState.COMMIT;
// }
// return RocketMQLocalTransactionState.UNKNOW;
}
}
消费者 : 模拟建行进行存款活动
/*
* 【消费者】模拟建行进行存款活动
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "transTopic", selectorExpression = "transTag", consumerGroup = "cg")
public class CCBTransactionConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 幂等性判断
// 1、使用唯一字段进行判断,如订单号
// 2、新建一张带有唯一性字段的表,辅助判断
// 执行具体业务
// if(...) { //执行失败
// log.error("【执行失败】转账失败!");
// } else //执行成功
log.info("【执行成功】转账成功!建行账户增加" + message + "元!");
// }
}
}
结果
Swagger进行测试:
消费消息,处理业务(实际上是上下游的不同服务,这里将生产者、消费者置于同一项目下运行):
RocketMQ-Console展示消息:
事务消息处理完成,业务成功执行!
// }
}
}
## 结果
Swagger进行测试:
[外链图片转存中...(img-qQ6NqBSv-1638109562163)]
处理消息业务(实际上是上下游的不同服务,这里将生产者、消费者置于同一项目下运行):
[外链图片转存中...(img-qCVKdkEm-1638109562164)]
RocketMQ-Console展示消息:
[外链图片转存中...(img-J51LAB2k-1638109562167)]
事务消息处理完成,业务成功执行!