什么是事务性消息?
它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行。
使用限制
(1)事务性消息没有调度和批处理支持。
(2)为了避免单个消息被检查过多而导致半队列消息累积,我们将单个消息的检查次数默认限制为15次,但是如果一条消息被检查过,用户可以通过更改代理配置中的“transactionCheckMax”参数来更改此限制“transactionCheckMax”次,默认情况下,broker将丢弃此消息并同时打印错误日志。用户可以通过重写“AbstractTransactionCheckListener”类来更改此行为。
(3)事务消息将在代理配置中的参数“transactionTimeout”确定的一段时间后进行检查。用户也可以在发送事务性消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,此参数优先于“transactionMsgTimeout”参数。
(4)事务性消息可能被多次检查或使用。
(5)提交给用户目标主题的不可信消息可能失败。目前,这取决于日志记录。RocketMQ本身的高可用机制保证了高可用性。如果要确保事务消息不会丢失,并且保证事务完整性,建议使用同步双写。机制。
(6)事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ服务器按其生产者id查询客户机。
应用程序
1、交易状态
事务性消息有三种状态:
(1)TransactionStatus.CommitTransaction:commit transaction,表示允许消费者使用此消息。
(2)TransactionStatus.rollback transaction:回滚事务,表示消息将被删除,不允许使用。
(3)TransactionStatus.Unknown:中间状态,表示需要MQ进行回查以确定状态。
原理图
假设有A,B两服务,A服务为优惠券服务,B服务为结算服务。
1.A服务发给B服务一个半消息(B服务消费不到)
2.当B服务返回接收成功后 A服务开展优惠券逻辑
3.优惠券数据落到数据库 此时有3个结果
1.A服务成功 发送B服务commit 修改B服务的消息状态 B服务可以执行结算
2.A服务失败发送rollback 修改B服务状态为失败 删除消息
3.网络原因 需要进行事务回查
4.B服务执行逻辑(如果B服务失败 需要手动判断 rocketMq的官方文档提出不提出解决方案)
代码处理
(1)创建事务生产者
使用TransactionMQProducer类创建producer客户机,并指定唯一的producerGroup,然后可以设置自定义线程池来处理检查请求。在执行本地事务后,需要根据执行结果回复MQ,回复状态在上一节描述。
1. @SpringBootConfiguration 2. public class MQProducerConfiguration { 3. 4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class); 5. /** 6. * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 7. */ 8. @Value("${rocketmq.producer.groupName}") 9. private String groupName; 10. @Value("${rocketmq.producer.namesrvAddr}") 11. private String namesrvAddr; 12. /** 13. * 消息最大大小,默认4M 14. */ 15. @Value("${rocketmq.producer.maxMessageSize}") 16. private Integer maxMessageSize; 17. /** 18. * 消息发送超时时间,默认3秒 19. */ 20. @Value("${rocketmq.producer.sendMsgTimeout}") 21. private Integer sendMsgTimeout; 22. /** 23. * 消息发送失败重试次数,默认2次 24. */ 25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}") 26. private Integer retryTimesWhenSendFailed; 27. 28. @Bean(name = "customRocketMQProducer") 29. public DefaultMQProducer getRocketMQProducer() throws Exception { 30. if (StringUtils.isEmpty(this.groupName)) { 31. throw new Exception("groupName is blank"); 32. } 33. if (StringUtils.isEmpty(this.namesrvAddr)) { 34. throw new Exception("nameServerAddr is blank"); 35. } 36. 37. //事务消息需要 38. TransactionListener transactionListener = new TransactionListenerImpl(); 39. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); 40. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { 41. @Override 42. public Thread newThread(Runnable r) { 43. Thread thread = new Thread(r); 44. thread.setName("client-transaction-msg-check-thread"); 45. return thread; 46. } 47. }); 48. //事务消息需要 49. producer.setExecutorService(executorService); 50. producer.setTransactionListener(transactionListener); 51. 52. producer.setNamesrvAddr(this.namesrvAddr); 53. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName 54. //producer.setInstanceName(instanceName); 55. if (this.maxMessageSize != null) { 56. producer.setMaxMessageSize(this.maxMessageSize); 57. } 58. if (this.sendMsgTimeout != null) { 59. producer.setSendMsgTimeout(this.sendMsgTimeout); 60. } 61. //如果发送消息失败,设置重试次数,默认为2次 62. if (this.retryTimesWhenSendFailed != null) { 63. producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); 64. } 65. try { 66. producer.start(); 67. LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]" 68. , this.groupName, this.namesrvAddr)); 69. } catch (MQClientException e) { 70. LOGGER.error(String.format("producer is error {}" 71. , e.getMessage(), e)); 72. throw new Exception(e); 73. } 74. return producer; 75. } 76. }
(2)实现TransactionListener接口
“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三种事务状态之一。
“checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三种事务状态之一。
1. package cn.baocl.rocketmq.controllor; 2. 3. import org.apache.rocketmq.client.producer.LocalTransactionState; 4. import org.apache.rocketmq.client.producer.TransactionListener; 5. import org.apache.rocketmq.common.message.Message; 6. import org.apache.rocketmq.common.message.MessageExt; 7. 8. import java.util.concurrent.ConcurrentHashMap; 9. import java.util.concurrent.atomic.AtomicInteger; 10. 11. public class TransactionListenerImpl implements TransactionListener { 12. 13. private AtomicInteger transactionIndex = new AtomicInteger(0); 14. 15. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); 16. 17. //执行本地事务(一般都是写一张日志表,放入本次交易的TransactionId,判断本次交易是否成功) 18. @Override 19. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { 20. //执行业务 21. //do thing 22. if(true){ 23. //业务执行成功 24. //插入日志表 25. //返回成功 26. return LocalTransactionState.COMMIT_MESSAGE; 27. }else{ 28. return LocalTransactionState.ROLLBACK_MESSAGE; 29. } 30. } 31. 32. //回查事务 当发送给broken 本地事务的状态状态丢失了 broken会再次根据id验证本地事务(查询日志表是否有日志) 33. @Override 34. public LocalTransactionState checkLocalTransaction(MessageExt msg) { 35. String transactionId = msg.getTransactionId(); 36. //根据transactionId查询日志表条数 如果条数大于0 就表示已经执行过以上逻辑了 37. Integer countStatus = Integer.valueOf(transactionId); 38. if (countStatus > 0) { 39. return LocalTransactionState.COMMIT_MESSAGE; 40. } else { 41. return LocalTransactionState.ROLLBACK_MESSAGE; 42. } 43. } 44. }
(3)调用
1. package cn.baocl.rocketmq.controllor; 2. 3. import org.apache.rocketmq.client.exception.MQBrokerException; 4. import org.apache.rocketmq.client.exception.MQClientException; 5. import org.apache.rocketmq.client.producer.DefaultMQProducer; 6. import org.apache.rocketmq.client.producer.SendResult; 7. import org.apache.rocketmq.common.message.Message; 8. import org.apache.rocketmq.remoting.common.RemotingHelper; 9. import org.apache.rocketmq.remoting.exception.RemotingException; 10. import org.slf4j.Logger; 11. import org.slf4j.LoggerFactory; 12. import org.springframework.web.bind.annotation.RequestMapping; 13. import org.springframework.web.bind.annotation.RestController; 14. 15. import javax.annotation.Resource; 16. import java.io.UnsupportedEncodingException; 17. 18. 19. @RestController 20. @RequestMapping("/test") 21. public class TestControllor { 22. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class); 23. 24. /** 25. * 使用RocketMq的生产者 26. */ 27. @Resource(name = "customRocketMQProducer") 28. private DefaultMQProducer defaultMQProducer; 29. 30. @RequestMapping("/send") 31. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 32. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; 33. 34. for (int i = 0; i < 10; i++) { 35. try { 36. Message msg = 37. new Message("DemoTopic", tags[i % tags.length], "KEY" + i, 38. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); 39. //sendMessageInTransaction :1).检查transactionListener是否存在 40. // 2).调用父类执行事务消息发送 41. SendResult sendResult = defaultMQProducer.sendMessageInTransaction(msg, null); 42. System.out.println("发送消息为"); 43. System.out.printf("%s%n", sendResult); 44. Thread.sleep(10); 45. } catch (MQClientException | UnsupportedEncodingException e) { 46. e.printStackTrace(); 47. } 48. } 49. 50. for (int i = 0; i < 100000; i++) { 51. Thread.sleep(1000); 52. } 53. } 54. }
阿里忽略了消费者报错了的情况,可以根据业务自定义
参考:https://my.oschina.net/u/3768341/blog/1616193
https://segmentfault.com/a/1190000019755235?utm_source=tag-newest
https://blog.csdn.net/hosaos/article/details/90050276
https://www.jianshu.com/p/cc5c10221aa1