RocketMQ与MYSQL事务消息整合

1、基础理论知识篇“两阶段提交”如果你了解可以跳过这段,当然如果你想深入了解你可以购买相关书籍或去搜索相关资料阅读
  两阶段提交分为 正常提交和异常提交或异常回滚
RocketMQ与MYSQL事务消息整合

   上面是正常提交的示意图,协调者发起预提交请求,参与者回复成功之后协调者再次发起commit请求,统一提交事物。事物结束。

   如果这两阶段提交过程当中有任何一个请求出现异常就会回滚,如下流程:

RocketMQ与MYSQL事务消息整合

   异常请求包括预提交 返回预提交的应答,commit请求 等任何一个失败都会导致整个事物回滚。

  二阶段提交的问题
    “二阶段提交”还有一个很严重的问题就是如果commit过程当中失败了 就导致了全部事物失败,代价很大,简单粗暴的处理方式

     还有一个问题是如果 commit过程中网络出现问题 commit没有被整个事物的参与者之一或者多个收到,这个时候就会出现数据不一致现象。

  可能大家会提到 协调者是谁,参与者又是谁那?

           这里简单说下自己的理解

     如果在你的应用程序中你是通过 begin等相关操作语句开始的,比如 你使用了spring的@Transactional注解等,

     那协调者就是你的“应用程序”,参与者就是 mysql或其他支持事物的数据库系统。

     如果你就直接向mysql发送了一条sql语句mysql是自行提交的,那协调者和参与者都是mysql数据库自己

2、这里说下mysql对所谓的“重复数据”提供的相关sql或关键字。

   unique 唯一主键约束

          在sql事物中和应用程序中都可以捕获这个错误码或异常,可以作为幂等判断的一个依据。

   upset 操作,发现唯一主键冲突然后更新相关数据,mongodb有直接使用的sql方法语句

          示例:insert into tablename(column1) values(123) on duplicate key update column1 =column1 +123

    ignore 忽略操作对于多余的操作直接忽略

          insert ignore into tablename(column1)  values(123)

  基础篇说完很多内容如果想深入了解可以自己找资料处理。下面是华丽分割线

3、在我们原有的认知里有一个方案就差那么一点点就可以大面积使用的。

   我们之前可能想过怎样既能发送mq又能写数据库,下面这个方案会分接近我们的愿望。

   我们遵从如下步骤进行代码处理:

   1、开启数据库事物执行相关sql

   2、发送MQ消息

   3、提交数据库事物

   (注意:以上每一步都是在上一步执行成功之后在执行下一步的)

   根据步骤我画出了下面的流程图

RocketMQ与MYSQL事务消息整合

其实这个流程是有一个漏洞的,如果我把上面的流程图改造为下面的二阶段提交的示意图就会很明显的看出来

RocketMQ与MYSQL事务消息整合

    不知道大家有么有发现问题,是不是 各种提交和回滚操作都是针对的数据库,而不是MQ。commit数据库事物出现异常就会造成数据不一致现象。

    其实也不用在想有没有其他的流程方案能解决分布式双写问题,只要存在多写问题就存在数据不一致问题的现象,

    所以就出现了3pc Paxos 等协议来解决分布式事物/一致性的问题。
    下面我们开始介绍怎么使用mysql和RocketMQ来实现事物问题

     华丽分割线

4、RocketMQ事物消息的过程

   1、发送MQ的事物消息

   2、事物消息发送成功后会同步触发对应执行本地接口来执行针对mysql数据库的操作

   3、如果有未commit的消息,RocketMQ 的 broker会定时间隔时间来回查数据库事物是否已经提交完成

5、结合RocketMQ的事物消息与Mysql数据库事物的实现思想

RocketMQ与MYSQL事务消息整合

  如果上面的二阶段提交你已经理解了,你会发现我这里设计的流程(上面图的流程)有点不太一样的地方

    什么地方那?

    MQ事物消息回滚的时候是因为mysql数据库事物没有提交成功而导致的,也就是说如果mysql数据库事务成功了MQ的事务消息是一定要成功的

    否则就会出现事物不一致的现象。

    假如发送MQ的prepare消息成功了,执行mysql事物的操作也成功了,但是偏偏返回给MQ的commit消息丢失了,那这个时候数据库消息并不会回滚。

  所以就有了回查本地事物消息是否成功的操作,来对MQ的消息做个补偿动作实现数据一致性

    理解了二阶段提交以及RocketMQ的事物实现之后你就可以自己设计事物相关操作的执行顺序了

    (这里的流程设计以及包括我的代码实现是以我的理解做出的最佳实践)

6、RocketMQ与Mysql事物结合注意事项

   1、如果应用程序承担协调者的工作就尽量晚开启事物和尽量早的提交数据库事物,事物中的sql对数据竞争多的sql尽量靠后

        因为执行数据库事物会有各种锁操作,减少锁的生命周期,数据库是稀缺资源,大家能省则省

   2、数据库事物最好设置超时时间,超时之后自动解除,最好不超过1分钟

   3、MQ默认1分钟之后回查一次已发送message但未commit的消息,最多回查15次,之后会执行回滚操作

   4、应用程序一定要做好幂等处理(可以参考上面mysql相关语句实现幂等接口)

   5、网络不要太差,否则会造成大量的重试,重试就会影响消息的及时性

   6、适用场景

                单次请求数量小

                每次请求会有数据产生,而不是查询产生的数据(比如 insert操作叫生产数据,select操作不生产数据)

                下游可以接受一定的延迟(这里有两个因素,有应用程序本身和Broker,这里指broker)

                下游服务或系统以接收到的消息为依据做相应的操作

                 MQ消息作为主要信息传递的工具
     下面说下具体代码实现

     华丽分割线

7、实战代码解析

   首先附上源码地址 https://github.com/zygfengyuwuzu/springboot-rocketmq-example

   下面将针对关键代码进行讲解

   首先介绍一下代码目录

RocketMQ与MYSQL事务消息整合

     了解了上面的代码目录下面说下代码的执行流程

   首先看事物消息生产者的实例对象创建
package rocketmq_example.mqandmysqltraction.producer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**

  • 生产者和消费者测试的时候记得注掉一中的一个以免观察不出效果
  • */

@Component
public class TransactionProducer {
static Logger logger = LoggerFactory.getLogger(TransactionProducer.class);

public DefaultMQProducer producer = null;

@Autowired
TransactionListener transactionListenerImp;

@PostConstruct
private void init() throws MQClientException {
logger.info("MQ事物生产者初始化开始--------------------------------------------------");
TransactionMQProducer transactionProducer = new TransactionMQProducer("mytestgroup");
// Producer 组名, 多个 Producer 如果属于一 个应用,发送同样的消息,则应该将它们 归为同一组
//transactionProducer.setProducerGroup("mytestgroup");
// Name Server 地址列表
transactionProducer.setNamesrvAddr("10.10.6.71:9876;10.10.6.72:9876");
// 超时时间 这里一定要大于数据库事物执行的超时时间
transactionProducer.setSendMsgTimeout(90000);
//这个线程池作用就是 mqbroker端回调信息的本地处理线程池
ExecutorService executorService = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS,

new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
 @Override
 public Thread newThread(Runnable r) {
  Thread thread = new Thread(r);
  thread.setName("client-transaction-msg-check-thread");
  return thread;
 }
});

transactionProducer.setExecutorService(executorService);
transactionProducer.setTransactionListener(transactionListenerImp);
producer = transactionProducer;
producer.start();
logger.info("MQ事物生产者初始化结束--------------------------------------------------");
}
public SendResult send(Message me) throws Exception {
return producer.send(me);
}
/**

  • 发送普通消息
  • @param Topic
  • @param Tags
  • @param body
  • @return
  • @throws Exception
    */

public SendResult send(String Topic, String Tags, String body) throws Exception {
Message me = new Message();
// 标示
me.setTopic(Topic);
// 标签
me.setTags(Tags);
// 内容
me.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));
return producer.send(me);
}
/**

  • 发送普通消息
  • @param Topic
  • @param Tags
  • @param key
  • @param body
  • @return
  • @throws Exception
    */

public SendResult send(String Topic, String Tags, String key, String body) throws Exception {
try {
Message me = new Message(Topic, Tags, key, 0, body.getBytes(RemotingHelper.DEFAULT_CHARSET), true);
return producer.send(me);
} catch (Exception e) {
logger.error("发送MQ信息异常Topic{},Tags{},key{},body{}", Topic, Tags, key, body);
throw e;
}
}
@PreDestroy
public void Destroy() {
producer.shutdown();
}
}
  上面的代码我们接收到请求传输过来的数据之后,首先做了MQ消息对象的创建,创建成功之后直接发送MQ事物消息

  事物消息发送成功之后会调用上面设置的接口实现类的TransactionListenerImpl.executeLocalTransaction()这个方法。

  接口实现的方法代码如下:

package rocketmq_example.mqandmysqltraction.producer;

import java.util.List;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import rocketmq_example.mqandmysqltraction.MyTableModel;
import rocketmq_example.mqandmysqltraction.MytableService;

/**

  • 把数据库事物嵌套在mq事物当中不能显示抛出异常
  • @author zyg
    *

*/
@Component
public class TransactionListenerImpl implements TransactionListener {

static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);

@Autowired
MytableService mytableService;

/**

  • 一定要设置执行sql时间,尽量不要超时
  • */

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
logger.info("开始执行本地数据库事物 transactionid:{}", msg.getTransactionId());
LocalTransactionState lts = LocalTransactionState.UNKNOW;
@SuppressWarnings("unchecked")
List mytablelist = (List) arg;
try {
long start=System.currentTimeMillis();
//数据库事物执行时间不要超过mq回查时间 默认15分钟
mytableService.execMytableinsert2(mytablelist, msg.getTransactionId());
logger.info("执行数据库事物耗时:{}",System.currentTimeMillis()-start);
lts = LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
logger.error("数据库事务异常", e);
lts = LocalTransactionState.ROLLBACK_MESSAGE;
}
logger.info("结束执行本地数据库事物 transactionid:{} 返回:{}", msg.getTransactionId(),lts);
return lts;
}

/**

  • 去数据库查询看看是否存在已经成功发送预提交数据而没有commit成功的mq信息
  • 每分钟1次默认15次
  • 这里可以做个计数 让MQ重试5次/5分钟就回滚减轻MQ回查的压力
  • */

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
if (mytableService.existMyTableModelByMsgid(msg.getTransactionId())) {
logger.info("查询到已提交事物 transactionid:{}",msg.getTransactionId());
return LocalTransactionState.COMMIT_MESSAGE;
} else {
logger.info("未查到已提交事物 transactionid:{}",msg.getTransactionId());
return LocalTransactionState.UNKNOW;
}

}

}

 上面代码有两个方法,这里说下两个方法的作用和执行时间

         executeLocalTransaction这个方法是发送完 事物消息 之后同步被调用到的方法,用来执行本地事物操作

         executeLocalTransaction方法有两个参数,第一个是发送成功之后的message消息,在这个方法中包含事物ID其实就是msgid

         第二个参数是object类型的是从dataapi传过来,

         我的代码中没做任何处理直接传递过来了然后直接转化传递给了service层进行事物处理

         这个executeLocalTransaction方法里面为什么要直接返回commit或rollback,

         目的是尽量快的告诉MQ我的数据库事务执行成功了,

         尽快将half消息转为正常消息,已备消费者消费到做业务处理。

         这里完全可以直接返回unknow,等待broker回查来实现commit操作的。但是这样做对回查消息broker造成一定的压力。

  上面代码的第二个方法是提供给broker回调执行的,进行检查本地事务是否成功执行的操作,发起方是broker

         这里面我们接收到broker的回查请求之后直接去数据库查询是否存在broker提供的事务ID的数据

         如果存在返回commit标识,如果不存在返回unknow标识以等待下一次再来回查

  到此我们的一个事务操作就算完成了

  另外大家可以直接查看service层的实现代码,就不一一解释了
package rocketmq_example.mqandmysqltraction;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class MytableService {
static Logger logger = LoggerFactory.getLogger(MytableService.class);

@Autowired
IMytableMapper mytable;

@Autowired
ObjectMapper objMapper;

/**

  • 这里可以显示提交事物 返回boolean 一条一条插入只是为了展现事物的特性 获取所有异常 处理你的业务逻辑等等
  • @param mytablemodels
  • @return
    */

@Transactional(rollbackFor = Exception.class, timeout = 60000)
public List execMytableinsert2(List mytablemodels, String msgid) {

// logger.info("开始执行数据库事物");
List result = new ArrayList();
for (MyTableModel myTableModel : mytablemodels) {
// 插入数据库
myTableModel.setMsgid(msgid);
mytable.insertmytable(myTableModel);
result.add(myTableModel.getId());
}
// logger.info("结束执行数据库事物");
return result;
}

public boolean existMyTableModelById(Integer id) {
MyTableModel myTableModel = mytable.selectMyTableModelById(id);
if (myTableModel != null && null != myTableModel.getId()) {
return true;
}
return false;
}

/**

  • 查询是否存在已经发送过的msgid消息
  • @param msgid
  • @return
    */

public boolean existMyTableModelByMsgid(String msgid) {
int count = mytable.selectMyTableModelByMsgid(msgid);
if (count > 0) {
return true;
}
return false;
}

public void insetmsg(MyTableModel mytablemodel) {
try {
mytable.insertmsgrecord(mytablemodel);

} catch (org.springframework.dao.DuplicateKeyException e) {
logger.error("主键冲突异常被捕获",e);
}
}
}
非常感谢你能看到这里!!!看到这里相信你已经对本篇博客的内容有所了解了!如果有什么问题或者想不通的地方欢迎评论区进行讨论。

如果有不正确的地方恳请指正
RocketMQ与MYSQL事务消息整合

上一篇:kvm虚拟化学习笔记(十八)之ESXi到KVM之v2v迁移


下一篇:深入学习Lock锁(5)——Condition接口应用与分析