分布式事务可以使用seata实现,但是,对于高并发的场景,使用seata会感觉稍慢,尤其是对一致性要求不那么高的业务完全可以不需要使用seata,这时候,我们可以考虑最终一致性的方案。通过消息队列机制来保证最终一致性,即可。
思想:在MQ中新建两个队列,一个死信队列,一个普通队列,让同一个交换机绑定这两个队列,死信队列不处理任何消息,只是用于存放过期的消息,当指定的时间到了,自动路由到普通队列,在某个服务中单独监听这个队列的消息并处理该消息。
实现步骤:
1. 准备工作
1)引入MQ
<!-- 引入mq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)mq相关配置文件
# 配置MQ基本信息
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
3)mq配置类(指定序列化机制、死信队列等的相关绑定关系)
package com.bjc.gulimall.ware.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
/**
* @描述:rabbitMQ配置类
* @创建时间: 2021/3/15
*/
@Configuration
public class MyRabbitConfig {
/* 使用JSON序列化对象 */
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@RabbitListener(queues = {"stock.release.stock.queue"})
public void handle(Message msg){
System.out.println(Thread.currentThread().getName() + "\t" + "" + msg);
}
/*
* 配置绑定关系等信息
* */
// 1. 库存服务默认交换机(topic类型)
@Bean
public Exchange stockEventExchange(){
// public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
return topicExchange;
}
// 2. 普通队列
@Bean
public Queue stockReleaseStockQueue(){
// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
return new Queue("stock.release.stock.queue",true,false,false);
}
// 3. 创建延时队列
@Bean
public Queue stockDeadStockQueue(){
// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","stock-event-exchange"); // 指定死信路由
arguments.put("x-dead-letter-routing-key","stock.release"); // 指定死信路由键(消息死了,要通过哪个路由键交出去)
arguments.put("x-message-ttl",120000); // 指定消息过期时间 单位毫秒
return new Queue("stock.delay.queue",true,false,false,arguments);
}
// 4. 创建2个绑定关系
// 4.1 绑定死信队列与交换机的关系
@Bean
public Binding deadBind(){
// Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);
}
// 4.2 绑定普通队列与交换机的关系
@Bean
public Binding releaseBind(){
// Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
}
}
4)开启rabbitMQ
启动类上添加如下注解
2. 实现逻辑
这里只是简单的罗列下,记录一下使用方法,具体逻辑具体分析即可
3. 测试
访问测试页面(略),查看MQ控制台,如图:
消息成功的到达指定的死信队列,数据库中也有记录对应的记录,如图;
等会的回退操作,就是根据这两张表来的。
4. 解锁操作
解锁涉及到了消息的消费,以及远程接口的调用,如果此次调用失败,需要重新执行,这时候,消息队列的消息已经被消费了,所以,会导致,重新解锁的时候没有消息了,因此,mq的ACK确认机制需要配置成手动确认。
在application.properties中添加如下配置:
# 手动ack模式开启
spring.rabbitmq.listener.simple.acknowledge-mode=manual
然后,新建一个监听,用于监听MQ的指定队列,代码如下:
package com.bjc.gulimall.ware.listener;
import com.alibaba.fastjson.TypeReference;
import com.bjc.common.to.mq.OrderTo;
import com.bjc.common.to.mq.StockDetailTo;
import com.bjc.common.to.mq.StockLockTo;
import com.bjc.common.utils.R;
import com.bjc.gulimall.ware.dao.WareSkuDao;
import com.bjc.gulimall.ware.entity.WareOrderTaskDetailEntity;
import com.bjc.gulimall.ware.entity.WareOrderTaskEntity;
import com.bjc.gulimall.ware.feign.OrderFeignService;
import com.bjc.gulimall.ware.service.WareOrderTaskDetailService;
import com.bjc.gulimall.ware.service.WareOrderTaskService;
import com.bjc.gulimall.ware.service.WareSkuService;
import com.bjc.gulimall.ware.vo.OrderVo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @描述:库存释放RabbitMQ监听器
* @创建时间: 2021/3/30
*/
@RabbitListener(queues = {"stock.release.stock.queue"}) // stock.release.stock.queue
@Component
public class StockReleaseListener {
@Autowired
private WareSkuService WareSkuService;
/*
* 添加解锁库存的功能
* 库存解锁的场景:
* 1)下单成功,但是订单过期未支付,被系统自动(用户主动)取消了
* 2)下单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存需要解锁(使用seata分布式事务太慢了,我们希望可以自动解锁)
*
* 注意:只要解锁库存的消息失败,一定要告诉mq服务器,此次解锁失败,消息不要删除,因此,需要设置消息的ACK为手动确认方式
* */
@RabbitHandler
public void handleLockStockRelease(Message msg, StockLockTo stockLockTo, Channel channel) throws IOException {
System.out.println("收到解锁库存的信息:");
try{
WareSkuService.unLockStock(stockLockTo);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
}
}
@RabbitHandler
public void handlerOrderCloseRelease(Message msg, OrderTo orderTo, Channel channel) throws IOException {
System.out.println("收到订单取消消息:" + orderTo);
try{
WareSkuService.unLockStock(orderTo);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
}
}
}
注意:最终一致性的保证可以使用rabbitMQ的延时队列来完成,详情可以参考springBoot高级篇——消息中间件RabbitMQ的延时队列的用法