分布式锁Redis实现方式

1、SingleSubmitAspectJ

package com.sxc.workflow.aspect;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import javassist.NotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
 * @author qiurunjing
 */
@Aspect
@Component
public class SingleSubmitAspectJ {
    private static final Logger logger = LoggerFactory.getLogger(SingleSubmitAspectJ.class);

    /**
     * 分布式锁,需要注入分布式锁实现类,分布式锁介质可以使用redis、zookeeper、mysql等等。
     */
    @Autowired
    @Qualifier("redisLocker")
    private IDistributeLocker distributeLocker;

    /**
     * 分布式锁前缀,如系统名,用于在多个系统共享同一个分布式介质时,区分其他系统使用
     */
    private static final String lockKeyPrefix = "sxc-trade";

//    public SingleSubmitAspectJ(IDistributeLocker distributeLocker, String lockKeyPrefix) {
//        this.distributeLocker = distributeLocker;
//        this.lockKeyPrefix = lockKeyPrefix;
//    }

    @Pointcut("@annotation(com.sxc.component.common.annotation.SingleSubmission)")
    public void pointCut() {
    }

    @Around("pointCut()")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        MethodSignature method = ((MethodSignature) pjp.getSignature());

        SingleSubmission singleSubmit = AopUtils.getMostSpecificMethod(method.getMethod(), pjp.getTarget().getClass())
                .getAnnotation(SingleSubmission.class);
        if (singleSubmit == null) {
            return pjp.proceed();
        }
        Map<String, Object> argMap = getArgMap(pjp, method);
        if (!shouldIntercept(argMap, singleSubmit)) {
            logger.info("不需要进行防重复提交拦截 whenExp == false");
            return pjp.proceed();
        }
        if (singleSubmit.expire() <= 0) {
            logger.info("不需要进行防重复提交拦截 expire <= 0");
            return pjp.proceed();
        }
        return doArroundLock(pjp, singleSubmit, argMap);
    }

    private Object doArroundLock(ProceedingJoinPoint pjp, SingleSubmission singleSubmit, Map<String, Object> argMap) throws Throwable {
        Expression expression = AviatorEvaluator.compile(singleSubmit.seedExp(), true);
        Object seedObj = expression.execute(argMap);
        if (seedObj instanceof List) {
            return lockListKeyAndExec(pjp, singleSubmit, (List) seedObj);
        }

        if (seedObj instanceof Object[]) {
            return lockListKeyAndExec(pjp, singleSubmit, Lists.newArrayList(seedObj));
        }

        return lockSingleKeyAndExec(pjp, singleSubmit, seedObj);
    }

    private Object lockSingleKeyAndExec(ProceedingJoinPoint pjp, SingleSubmission singleSubmit, Object seedObj) throws Throwable {
        String key = makeKey(singleSubmit, String.valueOf(seedObj));

        lock(singleSubmit, key);

        return proceedAndUnlock(pjp, key);
    }

    private Object lockListKeyAndExec(ProceedingJoinPoint pjp, SingleSubmission singleSubmit, List seedList) throws Throwable {
        List<String> lockedKeys = Lists.newArrayList();
        seedList.forEach(seed -> {
            String key = makeKey(singleSubmit, String.valueOf(seed));
            boolean lockSucc = false;
            try {
                lock(singleSubmit, key);
                lockedKeys.add(key);
                lockSucc = true;
            } finally {
                if (!lockSucc) {
                    lockedKeys.forEach(one -> unlock(one));
                }
            }
        });
        return proceedAndUnlockList(pjp, lockedKeys);
    }

    private void lock(SingleSubmission singleSubmit, String key) throws RepeatSubmitException {
        logger.info("尝试获取全局锁, key: {}", key);
        boolean isSucc = distributeLocker.tryLock(key, singleSubmit.expire());
        if (!isSucc) {
            if (!singleSubmit.needRetry()) {
                repeatCommitException(key, singleSubmit);
                return;
            }
            wait(singleSubmit, key);
        }
    }

    private void repeatCommitException(String key, SingleSubmission singleSubmit) throws RepeatSubmitException {
        logger.info("任务处理中, 请不要重复提交. key:{} errMsg:{}", key, singleSubmit.errMsg());
        throw new RepeatSubmitException(singleSubmit.errMsg());
    }

    private void unlock(String key) {
        try {
            distributeLocker.unLock(key);
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
        }
        logger.info("退出全局锁, key: {}", key);
    }

    private Object proceedAndUnlockList(ProceedingJoinPoint pjp, List<String> keys) throws Throwable {
        try {
            Object obj = pjp.proceed();
            return obj;
        } finally {
            keys.forEach(key -> unlock(key));
            logger.info("退出全局锁, keys: {}", keys);
        }
    }


    private Object proceedAndUnlock(ProceedingJoinPoint pjp, String key) throws Throwable {
        try {
            Object obj = pjp.proceed();
            return obj;
        } finally {
            try {
                distributeLocker.unLock(key);
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
            }
            logger.info("退出全局锁, key: {}", key);
        }
    }

    private void wait(SingleSubmission singleSubmit, String key) throws RepeatSubmitException {
        long waitTime = 0;
        boolean isSucc = false;
        while (waitTime < singleSubmit.expire()) {
            SleepUtil.sleep(1 * 1000);
            waitTime += 1;
            logger.info("再次尝试获取全局锁, key: {}, 等待时间: {}/{} s", key, waitTime, singleSubmit.expire());
            isSucc = distributeLocker.tryLock(key, singleSubmit.expire());
            if (isSucc) {
                break;
            }
        }
        if (!isSucc) {
            repeatCommitException(key, singleSubmit);
        }
    }

    private boolean shouldIntercept(Map<String, Object> argMap, SingleSubmission submission) {
        if (StringUtils.isNotBlank(submission.whenExp())) {
            Expression expression = AviatorEvaluator.compile(submission.whenExp(), true);
            return (boolean) expression.execute(argMap);
        }
        return true;
    }

    private String makeKey(SingleSubmission singleSubmit, String seedVal) {
        return lockKeyPrefix + "_" + singleSubmit.group() + "_" + seedVal;
    }

    private Map<String, Object> getArgMap(ProceedingJoinPoint pjp, MethodSignature method) throws NotFoundException {
        String[] parameterNames;
        if (AopUtils.isJdkDynamicProxy(pjp.getThis())) {
            parameterNames = ReflectionUtil.getParameterNames(pjp.getTarget().getClass(), method.getMethod().getName());
        } else {
            parameterNames = method.getParameterNames();
        }
        Object[] pjpArgs = pjp.getArgs();

        Map<String, Object> argMap = Maps.newHashMap();

        if (parameterNames != null) {
            for (int i = 0; i < parameterNames.length; i++) {
                argMap.put(parameterNames[i], pjpArgs[i]);
            }
        }
        return argMap;
    }

}

2、IDistributeLocker

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.sxc.component.common.lock;

public interface IDistributeLocker {
    boolean tryLock(String var1, int var2);

    void unLock(String var1);
}

3、IDistributeLocker的实现类RedisLocker

package com.biz.common.lock;

import com.common.lock.IDistributeLocker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName RedisLocker
 * @Description TODO
 * @Author zengry
 * @Date 2021/1/11 14:54
 */

@Slf4j
@Service
public class RedisLocker implements IDistributeLocker {

    @Resource
    RedisTemplate<String, Object> redisTemplate;

    @Override
    public boolean tryLock(String key, int expireSeconds) {
        log.info("资源key:{}开始获取分布式锁", key);
        // todo 获取线程ID
        long requestTime = System.currentTimeMillis();
        Boolean lock = redisTemplate.opsForValue().setIfAbsent(key, requestTime, expireSeconds, TimeUnit.SECONDS);
        if (null == lock || !lock) {
            log.info("资源key:{}正在加锁....", key);

            // 防止多线程抢锁
            Object currentValue = redisTemplate.opsForValue().get(key);
            if (null != currentValue && (Long) currentValue < System.currentTimeMillis()) {
                Object target = redisTemplate.opsForValue().getAndSet(key, requestTime);
                if (null != target && target.equals(requestTime)) {
                    log.info("资源key:{}重新分布式锁成功!!!", key);
                    return true;
                }
            }
            log.info("资源key:{}获取分布式锁失败", key);
            return false;
        }
        log.info("资源key:{}获取分布式锁成功!!!", key);
        return true;
    }

    @Override
    public void unLock(String key) {
        try{
            redisTemplate.opsForValue().getOperations().delete(key);
        }catch (Exception e){
            log.error("资源key:{}分布式锁释放异常, {}", key, e.getMessage() );
            e.printStackTrace();
        }
    }
}

4、aspect包下SingleSubmission自定义注解

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.common.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Repeatable(SingleSubmissions.class)
public @interface SingleSubmission {
String group();

String seedExp() default "";

String whenExp() default "";

int expire() default 30;

boolean needRetry() default false;

String errMsg() default "任务处理中,请不要重复提交";

}

5、SingleSubmissions自定义注解

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.common.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SingleSubmissions {
SingleSubmission[] value();
}

6、Redis的yml配置

spring: 
	redis:
	    database: 5
	    host: 127.0.0.1
	    port: 6379
	    password: "123456"
	    # 连接超时时间(毫秒)默认是2000ms
	    timeout: 8000ms
	    jedis:
	      pool:
	        # 连接池最大连接数(使用负值表示没有限制)
	        max-active: 16
	        # 连接池最大阻塞等待时间(使用负值表示没有限制)
	        max-wait: 30000
	        # 连接池中的最大空闲连接
	        max-idle: 8
	        # 连接池中的最小空闲连接
	        min-idle: 2

7、启动类配置注解@EnableAspectJAutoProxy(proxyTargetClass = true)

8、pom依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

9、工具类

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.sxc.component.util;

public class SleepUtil {
    public SleepUtil() {
    }

    public static void sleep(long mills) {
        try {
            Thread.sleep(mills);
        } catch (InterruptedException var3) {
        }

    }
}





//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.sxc.component.util;

import com.google.common.collect.Maps;
import java.util.Map;
import javassist.ClassClassPath;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.Modifier;
import javassist.NotFoundException;
import javassist.bytecode.CodeAttribute;
import javassist.bytecode.LocalVariableAttribute;
import javassist.bytecode.MethodInfo;

public class ReflectionUtil {
    private static Map<String, String[]> parameterNamesCache = Maps.newConcurrentMap();

    public ReflectionUtil() {
    }

    public static String[] getParameterNames(Class cls, String methodName, boolean cache) throws NotFoundException {
        if (cache) {
            String key = makeParameterCacheKey(cls, methodName);
            if (parameterNamesCache.containsKey(key)) {
                return (String[])parameterNamesCache.get(key);
            }
        }

        String[] paramNames = getParameterNamesNoCache(cls, methodName);
        if (cache) {
            parameterNamesCache.put(makeParameterCacheKey(cls, methodName), paramNames);
        }

        return paramNames;
    }

    private static String[] getParameterNamesNoCache(Class cls, String methodName) throws NotFoundException {
        String[] paramNames = null;
        ClassPool pool = ClassPool.getDefault();
        ClassClassPath classPath = new ClassClassPath(cls);
        pool.insertClassPath(classPath);
        CtClass cc = pool.get(cls.getName());
        CtMethod cm = cc.getDeclaredMethod(methodName);
        MethodInfo methodInfo = cm.getMethodInfo();
        CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
        LocalVariableAttribute attr = (LocalVariableAttribute)codeAttribute.getAttribute("LocalVariableTable");
        if (attr != null) {
            paramNames = new String[cm.getParameterTypes().length];
            int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;

            for(int i = 0; i < paramNames.length; ++i) {
                paramNames[i] = attr.variableName(i + pos);
            }
        }

        return paramNames;
    }

    private static String makeParameterCacheKey(Class cls, String methodName) {
        return cls.getName() + "_" + methodName;
    }

    public static String[] getParameterNames(Class cls, String methodName) throws NotFoundException {
        return getParameterNames(cls, methodName, true);
    }
}


12、ProcessSubmissionGroup

package com.sxc.workflow.common.lock;

import lombok.Data;

/**
 * 防重复提交锁标记,用于定义某种业务场景下的锁粒度
 */
public class ProcessSubmissionGroup {

    /**
     * 受理人受理任务
     */
    public static final String ASSIGNEE_USER_HANDLE_TASK = "ASSIGNEE_USER_HANDLE_TASK";

    /**
     * 申请人撤回流程
     */
    public static final String APPLY_WITHDRAW_PROCESS = "APPLY_WITHDRAW_PROCESS";
}

11、使用,在业务层BizService方法上加注解即可

@SingleSubmission(
            group = ProcessSubmissionGroup.APPLY_START_PROCESS,
            seedExp = "xxxDTO.id",
            errMsg = "流程正在启动,不可以再次启动")
上一篇:谁有115网盘资源


下一篇:Sentinel:SentinelResourceAspect