彻底解决分布式环境下Redisson消息队列监听重复执行问题

问题现象:测试环境单台部署,没有问题,生产环境多台部署订单都是2条重复数据。

问题描述:我们把每个服务都部署了2台,订单产生后,有redisson的mq发布,如果MQListener监听到就会执行后面的业务逻辑。现实的问题是2台MQListener都会监听到,会重复处理我们的逻辑,插入数据库或修改数据库或写入ES等都会执行2遍。

本文的DEMO中使用的是redisson的mq来测试的,同时RabbitMQ,ActiveMQ,RocketMQ也会有同样的问题,处理逻辑大家可以参照,应该都是大同小异。

解决方法:redisson公平锁【谁先抢到谁先锁,其余需要等待】加锁处理,只需要一台来处理。

------------------------------------------------核心代码----------------------------------------------------------

1、pom依赖及redisson插件

<!-- 插件地址https://gitee.com/ztp/redisson-spring-boot-starter -->
<dependency>
  <groupId>com.zengtengpeng</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>1.0.8</version>
</dependency>

2、application.yml配置文件

// 集群配置,先进先出原则
redisson:
  multiple-server-config:
    node-addresses[0]: 192.168.1.57:7000
    node-addresses[1]: 192.168.1.57:7001
    node-addresses[2]: 192.168.1.117:7000
    node-addresses[3]: 192.168.1.57:7002
    node-addresses[4]: 192.168.1.117:7001
    node-addresses[5]: 192.168.1.117:7002
    loadBalancer: org.redisson.connection.balancer.RoundRobinLoadBalancer
    readMode: MASTER
    subscriptionMode: MASTER
  password: redismima123
  model: CLUSTER




// 单节点配置
redisson:
  singleServerConfig:
    address: 192.168.1.119:6380
    database: 3
  password: redisMiMa123
  model: SINGLE

3、springboot启动程序

package com;

import com.zengtengpeng.annotation.EnableMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;


@Slf4j
// 这里的注解要加上,不然MQ不生效
@EnableMQ
public class OrderJobApplication {

	public static void main(String[] args) {
		SpringApplication.run(OrderJobApplication.class, args);
		log.info("OrderJob项目启动成功!!!");
	}

}

4、mq生产者

package com.services.impl;

/**
 *
 * @author: renkai721@163.com
 * @date: 2021年09月16日 20:41:54
 * @description:
 */

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.pagehelper.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.*;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {


    @Resource
    private RedissonClient redissonClient;
   

    @Override
    public void createOrder(OrderParamReqVo vo)  {
        // 虽然order的服务也部署了2台,但是从用户点击创建订单到后台网关,
        // 所有的nginx都只会转发到一台机器上,所以生产者不需要单独处理。
        Long orderId = 0L;
        // 查询ID
        RLock idLock = redissonClient.getFairLock("orderIdLock");
        try{
            idLock.lock();
            RBucket<Long> idBucket = redissonClient.getBucket("orderId");
            synchronized (idBucket){
                Long redisId = idBucket.get();
                if(redisId != null){
                    redisId+=1;
                }else{
                    redisId = 1L;
                }
                orderId = redisId;
                idBucket.set(redisId);
                log.info("生成的的orderId="+orderId);
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            idLock.unlock();
        }
        // topic名字和监听中的名字要一致,写法也有很多,大家按照自己喜欢的方式去写
        RTopic orderMq = redissonClient.getTopic("orderTopic");
        OrderParamRespVo obj = OrderParamRespVo.builder()
                .id(orderId)
                .userId(vo.getUserId())
                .status(0)
                .createTime(new Date())
                .build();
        orderMq.publish(obj);
        log.info("order订单的MQ生成了,快快接收处理吧,obj={}",obj);
    }

  

}

5、消费者

package com.mq.listener;

import com.alibaba.druid.util.StringUtils;
import com.zengtengpeng.annotation.MQListener;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.TimeUnit;

/**
 *
 * @Description:
 * @author: renkai721@163.com
 * @date: 2021年09月19日 11:28 上午
 */

@Component
@Slf4j
public class OrderMqListener {

    @Resource
    private RedissonClient redissonClient;


    @MQListener(name = "orderTopic")
    public void orderTopicSave(CharSequence charSequence, OrderParamRespVo vo, Object object){
        // 这里的MQListener大家要注意,部署了3台,那么3台都会监听,
        // 如果有数据下发,3台会同时触发。
        String value = getProcessId();
        log.info("value={}",value);
        RBucket<String> idBucket = redissonClient.getBucket("mqOrderId"+vo.getOrderId);
        // redisson公平锁,谁先锁住谁使用
        RLock idLock = redissonClient.getFairLock("mqLockOrderId"+vo.getOrderId);
        // 锁2秒,其余的处于等待,2秒后锁会自动解锁,也就是finally不需要单独处理
        idLock.lock(2,TimeUnit.SECONDS);
        try {
            if(idLock.isLocked()){
                synchronized (idBucket) {
                    if(StringUtils.isEmpty(idBucket.get())){
                        // 这里的逻辑是使用了机器的进程ID+机器名来判断唯一标识的
                        // 如果最简单的就是idBucket.set("1");
                        // 只要idBucket有值就说明有人已经锁住在处理了。
                        idBucket.set(value,5, TimeUnit.MINUTES);
                    }
                    log.info("idBucket.get()={}",idBucket.get());
                }
            }
            if(value.equals(idBucket.get())){
                log.info("让我来处理吧,其它小伙伴休息一下吧!");
                // 自己的写库或写redis逻辑处理
                OrderDao.save(vo);
                log.info("orderJob MQ收到消息, 处理完毕。");
                idBucket.delete();
            }else {
                log.info("已经有人处理啦");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(idLock.isLocked() && idLock.isHeldByCurrentThread()){
                idLock.unlock();
            }
        }

    }

    public static final String getProcessId() {
        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
        return runtimeMXBean.getName();
    }

}

上一篇:Spring Boot 整合 Redis


下一篇:Java爬取网页指定内容