spring事务与消息队列

在开发过程中,遇到一个bug,产生bug的原因是spring事务提交晚于消息队列的生产消息,导致消息队列消费消息时获取到的数据不正确。这篇文章介绍问题的产生和一步步的解决过程。

一.问题的产生:

场景还原:接口中的一个方法,首先修改订单状态,然后向消息队列中生产消息,消息队列的消费者获取到消息检测订单状态,发现订单状态未更改。

代码:

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
@Resource MqService mqService;
@OrderDao orderDao; public void push(String orderId) {
// 更新订单状态,之前的状态是1
updateStatus(orderId, 3);
// 产生消息
mqService.produce(orderId);
}
public viod updateStatus(String orderId, Integer status) {
orderDao.updateStatus(orderId, status);
}
}

问题产生原因:orderApi中的所有方法都有事务,事务类型PROPAGATION_REQUIRED,所以push方法对数据的操作会在push代码全部执行之后提交,而在事务提交之前消息队列的消息已经产生所以消息队列中消费到的订单从数据库查询出的状态可能还为1。为了让bug现象更明显,可以在push方法最后添加:

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

这样就会发现消费消息时,订单状态一定是未修改的。

二.问题的解决:

解决方案:在更新数据时,新建一个事物,保证更新代码执行完成后,更新数据库的事务已被提交。(确保消息产生前数据库操作已提交)

按照上述方案,我首先想到的是直接修改updateStatus方法的事务类型;我将此方法的事务类型改为PROPAGATION_REQUIRES_NEW(新建事务,如果当前存在事务,把当前事务挂起)。

但是这么做有两点不合适:

  1.强制修改了updateStaus的事务类型,可能影响其他流程。

  2.未起到作用,updateStaus方法中没有新建事务。

关于第二点的解释:spring添加事务是通过BeanNameAutoProxyCreator实现的动态代理,只是给bean对象添加了事务,现在在类内部调用方法,是不会触发新事物的创建的。

所以在经过以上尝试后,我创建了一个新的类:

@Service("orderExtApi")
public class OrderExtApiImpl {
@Resource OrderApi orderApi; public void updateStatusNewPropagation(String orderId) {
orderApi.updateStatus(orderId);
}
}

并为updateStatusNewPropagation方法添加事务PROPAGATION_REQUIRES_NEW

这个类就只是为了给orderApi中的updateStaus方法新起一个事务。

ok,到此为止bug已经解决了。

但是代码中还是存在问题:对数据库的操作已经提交,如果生产消息出现异常对业务逻辑来说还是错误的。所以需要检测消息的产生是否完成。

最终orderApi中的代码如下:

@Service(orderApi)
public class OrderApiImpl implements OrderApi {
@Resource MqService mqService;
@Resource OrderDao orderDao;
@Resource OrderExtApiImpl orderExtApi; public void push(String orderId) {
// 更新订单状态,之前的状态是1
orderExtApi.updateStatusNewPropagation(orderId, 3);
// 产生消息--produce会检测是否出现异常 当返回1时表示生产消息成功
Response response = mqService.produce(orderId);
if (response.getCode() != 1) {
log.info("消息队列生产消息异常:" + response.getErrorMsg())
// 生产消息异常,重置状态 等待下次重新执行
orderExtApi.updateStatusNewPropagation(orderId, 1);
} }
public viod updateStatus(String orderId, Integer status) {
orderDao.updateStatus(orderId, status);
}
}

  

上一篇:嗯,这个BLOG其实是个更新服务器


下一篇:CenOS6.3 ssh 公钥认证报错:Permission denied (publickey,gssapi-keyex,gssapi-with-mic)