分布式事务——最终一致性的保证

分布式事务可以使用seata实现,但是,对于高并发的场景,使用seata会感觉稍慢,尤其是对一致性要求不那么高的业务完全可以不需要使用seata,这时候,我们可以考虑最终一致性的方案。通过消息队列机制来保证最终一致性,即可。

思想:在MQ中新建两个队列,一个死信队列,一个普通队列,让同一个交换机绑定这两个队列,死信队列不处理任何消息,只是用于存放过期的消息,当指定的时间到了,自动路由到普通队列,在某个服务中单独监听这个队列的消息并处理该消息。

实现步骤:

1. 准备工作

1)引入MQ

<!-- 引入mq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)mq相关配置文件

# 配置MQ基本信息
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3)mq配置类(指定序列化机制、死信队列等的相关绑定关系)

package com.bjc.gulimall.ware.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

/**
 * @描述:rabbitMQ配置类
 * @创建时间: 2021/3/15
 */
@Configuration
public class MyRabbitConfig {

    /* 使用JSON序列化对象 */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @RabbitListener(queues = {"stock.release.stock.queue"})
    public void handle(Message msg){
        System.out.println(Thread.currentThread().getName() + "\t" + "" + msg);
    }

    /*
    * 配置绑定关系等信息
    * */
    // 1. 库存服务默认交换机(topic类型)
    @Bean
    public Exchange stockEventExchange(){
        // public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
        return topicExchange;
    }

    // 2. 普通队列
    @Bean
    public Queue stockReleaseStockQueue(){
        // public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        return new Queue("stock.release.stock.queue",true,false,false);
    }

    // 3. 创建延时队列
    @Bean
    public Queue stockDeadStockQueue(){
        // public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange","stock-event-exchange");     // 指定死信路由
        arguments.put("x-dead-letter-routing-key","stock.release");         // 指定死信路由键(消息死了,要通过哪个路由键交出去)
        arguments.put("x-message-ttl",120000);                               // 指定消息过期时间 单位毫秒
        return new Queue("stock.delay.queue",true,false,false,arguments);
    }

    // 4. 创建2个绑定关系
    //      4.1 绑定死信队列与交换机的关系
    @Bean
    public Binding deadBind(){
        // Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
        return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);
    }

    //      4.2 绑定普通队列与交换机的关系
    @Bean
    public Binding releaseBind(){
        // Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
        return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);
    }

}

4)开启rabbitMQ

启动类上添加如下注解

分布式事务——最终一致性的保证

2. 实现逻辑

这里只是简单的罗列下,记录一下使用方法,具体逻辑具体分析即可

分布式事务——最终一致性的保证

3. 测试

访问测试页面(略),查看MQ控制台,如图:

分布式事务——最终一致性的保证

消息成功的到达指定的死信队列,数据库中也有记录对应的记录,如图;

分布式事务——最终一致性的保证

分布式事务——最终一致性的保证

等会的回退操作,就是根据这两张表来的。

4. 解锁操作

解锁涉及到了消息的消费,以及远程接口的调用,如果此次调用失败,需要重新执行,这时候,消息队列的消息已经被消费了,所以,会导致,重新解锁的时候没有消息了,因此,mq的ACK确认机制需要配置成手动确认。

在application.properties中添加如下配置:

# 手动ack模式开启
spring.rabbitmq.listener.simple.acknowledge-mode=manual

然后,新建一个监听,用于监听MQ的指定队列,代码如下:

package com.bjc.gulimall.ware.listener;

import com.alibaba.fastjson.TypeReference;
import com.bjc.common.to.mq.OrderTo;
import com.bjc.common.to.mq.StockDetailTo;
import com.bjc.common.to.mq.StockLockTo;
import com.bjc.common.utils.R;
import com.bjc.gulimall.ware.dao.WareSkuDao;
import com.bjc.gulimall.ware.entity.WareOrderTaskDetailEntity;
import com.bjc.gulimall.ware.entity.WareOrderTaskEntity;
import com.bjc.gulimall.ware.feign.OrderFeignService;
import com.bjc.gulimall.ware.service.WareOrderTaskDetailService;
import com.bjc.gulimall.ware.service.WareOrderTaskService;
import com.bjc.gulimall.ware.service.WareSkuService;
import com.bjc.gulimall.ware.vo.OrderVo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @描述:库存释放RabbitMQ监听器
 * @创建时间: 2021/3/30
 */
@RabbitListener(queues = {"stock.release.stock.queue"}) // stock.release.stock.queue
@Component
public class StockReleaseListener {

    @Autowired
    private WareSkuService WareSkuService;

    /*
     * 添加解锁库存的功能
     * 库存解锁的场景:
     *   1)下单成功,但是订单过期未支付,被系统自动(用户主动)取消了
     *   2)下单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存需要解锁(使用seata分布式事务太慢了,我们希望可以自动解锁)
     *
     *   注意:只要解锁库存的消息失败,一定要告诉mq服务器,此次解锁失败,消息不要删除,因此,需要设置消息的ACK为手动确认方式
     * */
    @RabbitHandler
    public void handleLockStockRelease(Message msg, StockLockTo stockLockTo, Channel channel) throws IOException {
        System.out.println("收到解锁库存的信息:");
        try{
            WareSkuService.unLockStock(stockLockTo);
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
        }
    }

    @RabbitHandler
    public void handlerOrderCloseRelease(Message msg, OrderTo orderTo, Channel channel) throws IOException {
        System.out.println("收到订单取消消息:" + orderTo);
        try{
            WareSkuService.unLockStock(orderTo);
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
        }
    }
}

注意:最终一致性的保证可以使用rabbitMQ的延时队列来完成,详情可以参考springBoot高级篇——消息中间件RabbitMQ的延时队列的用法

上一篇:时序预测相关视频笔记


下一篇:翻译:《实用的Python编程》04_01_Class