问题现象:测试环境单台部署,没有问题,生产环境多台部署订单都是2条重复数据。
问题描述:我们把每个服务都部署了2台,订单产生后,有redisson的mq发布,如果MQListener监听到就会执行后面的业务逻辑。现实的问题是2台MQListener都会监听到,会重复处理我们的逻辑,插入数据库或修改数据库或写入ES等都会执行2遍。
本文的DEMO中使用的是redisson的mq来测试的,同时RabbitMQ,ActiveMQ,RocketMQ也会有同样的问题,处理逻辑大家可以参照,应该都是大同小异。
解决方法:redisson公平锁【谁先抢到谁先锁,其余需要等待】加锁处理,只需要一台来处理。
------------------------------------------------核心代码----------------------------------------------------------
1、pom依赖及redisson插件
<!-- 插件地址https://gitee.com/ztp/redisson-spring-boot-starter -->
<dependency>
<groupId>com.zengtengpeng</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>1.0.8</version>
</dependency>
2、application.yml配置文件
// 集群配置,先进先出原则
redisson:
multiple-server-config:
node-addresses[0]: 192.168.1.57:7000
node-addresses[1]: 192.168.1.57:7001
node-addresses[2]: 192.168.1.117:7000
node-addresses[3]: 192.168.1.57:7002
node-addresses[4]: 192.168.1.117:7001
node-addresses[5]: 192.168.1.117:7002
loadBalancer: org.redisson.connection.balancer.RoundRobinLoadBalancer
readMode: MASTER
subscriptionMode: MASTER
password: redismima123
model: CLUSTER
// 单节点配置
redisson:
singleServerConfig:
address: 192.168.1.119:6380
database: 3
password: redisMiMa123
model: SINGLE
3、springboot启动程序
package com;
import com.zengtengpeng.annotation.EnableMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
@Slf4j
// 这里的注解要加上,不然MQ不生效
@EnableMQ
public class OrderJobApplication {
public static void main(String[] args) {
SpringApplication.run(OrderJobApplication.class, args);
log.info("OrderJob项目启动成功!!!");
}
}
4、mq生产者
package com.services.impl;
/**
*
* @author: renkai721@163.com
* @date: 2021年09月16日 20:41:54
* @description:
*/
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.pagehelper.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.*;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Resource
private RedissonClient redissonClient;
@Override
public void createOrder(OrderParamReqVo vo) {
// 虽然order的服务也部署了2台,但是从用户点击创建订单到后台网关,
// 所有的nginx都只会转发到一台机器上,所以生产者不需要单独处理。
Long orderId = 0L;
// 查询ID
RLock idLock = redissonClient.getFairLock("orderIdLock");
try{
idLock.lock();
RBucket<Long> idBucket = redissonClient.getBucket("orderId");
synchronized (idBucket){
Long redisId = idBucket.get();
if(redisId != null){
redisId+=1;
}else{
redisId = 1L;
}
orderId = redisId;
idBucket.set(redisId);
log.info("生成的的orderId="+orderId);
}
} catch (Exception e){
e.printStackTrace();
} finally {
idLock.unlock();
}
// topic名字和监听中的名字要一致,写法也有很多,大家按照自己喜欢的方式去写
RTopic orderMq = redissonClient.getTopic("orderTopic");
OrderParamRespVo obj = OrderParamRespVo.builder()
.id(orderId)
.userId(vo.getUserId())
.status(0)
.createTime(new Date())
.build();
orderMq.publish(obj);
log.info("order订单的MQ生成了,快快接收处理吧,obj={}",obj);
}
}
5、消费者
package com.mq.listener;
import com.alibaba.druid.util.StringUtils;
import com.zengtengpeng.annotation.MQListener;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.TimeUnit;
/**
*
* @Description:
* @author: renkai721@163.com
* @date: 2021年09月19日 11:28 上午
*/
@Component
@Slf4j
public class OrderMqListener {
@Resource
private RedissonClient redissonClient;
@MQListener(name = "orderTopic")
public void orderTopicSave(CharSequence charSequence, OrderParamRespVo vo, Object object){
// 这里的MQListener大家要注意,部署了3台,那么3台都会监听,
// 如果有数据下发,3台会同时触发。
String value = getProcessId();
log.info("value={}",value);
RBucket<String> idBucket = redissonClient.getBucket("mqOrderId"+vo.getOrderId);
// redisson公平锁,谁先锁住谁使用
RLock idLock = redissonClient.getFairLock("mqLockOrderId"+vo.getOrderId);
// 锁2秒,其余的处于等待,2秒后锁会自动解锁,也就是finally不需要单独处理
idLock.lock(2,TimeUnit.SECONDS);
try {
if(idLock.isLocked()){
synchronized (idBucket) {
if(StringUtils.isEmpty(idBucket.get())){
// 这里的逻辑是使用了机器的进程ID+机器名来判断唯一标识的
// 如果最简单的就是idBucket.set("1");
// 只要idBucket有值就说明有人已经锁住在处理了。
idBucket.set(value,5, TimeUnit.MINUTES);
}
log.info("idBucket.get()={}",idBucket.get());
}
}
if(value.equals(idBucket.get())){
log.info("让我来处理吧,其它小伙伴休息一下吧!");
// 自己的写库或写redis逻辑处理
OrderDao.save(vo);
log.info("orderJob MQ收到消息, 处理完毕。");
idBucket.delete();
}else {
log.info("已经有人处理啦");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(idLock.isLocked() && idLock.isHeldByCurrentThread()){
idLock.unlock();
}
}
}
public static final String getProcessId() {
RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
return runtimeMXBean.getName();
}
}