基于Spring aop 和 redisson 实现分布式锁(灵活设置lockName)

1. 阅读本文时,您应该已经了解的内容

  1. Spring boot框架基本使用(我这里使用的是spring cloud分布式框架)
  2. aop的基本原理
  3. 了解redisson分布式锁机制
  4. 对反射和注解使用有足够的了解

如果对以上内容了解不足,阅读本文会比较吃力。(第一次写,有不合适或者更优解的地方欢迎指正)。

2. 实现效果(最简功能)

    @DistributedLock("testLock")
    public R testWrite(@DistributedLockParam String param) {
        System.out.println(new Date());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getId());
        System.out.println(new Date());
        return R.ok();
    }

        使用@DistributedLock的方法在执行过程中会持有一个redisson锁,当我用JMeter(压测工具,可以简单理解成几乎同时发送n个请求)进行三次请求的时候,控制台的输入内容如下

基于Spring aop 和 redisson 实现分布式锁(灵活设置lockName)

        可以看到,三次请求得以有序执行,分布式锁生效,下面我会讲解具体的实现思路和代码。

3. 实现思路

        redisson框架的分布式锁基于redis实现,通过访问其中同名的键值来判断当前任务有没有其他线程正在执行,在java代码中,可以在逻辑代码之前进行上锁(lock),逻辑代码之后进行解锁(unlock)来完成。

        Spring的aop,在调用 由Bean容器管理的对象 中的方法时,会自动触发aop的执行,因此,我们可以设计一个aop,帮助我们完成上述redisson的功能,这样我们就不需要手动编写上锁解锁的这个过程了。

        下面是一个简易的redissonDemo,当然其中缺少很多真正使用时需要用到的参数。

    @Autowired
    private RedissonClient redissonClient;
    public void lockDemo() {
        String name = "lockName";
        RLock lock = redissonClient.getLock(name);
        try {
            lock.lock();
            //do someThing
            Thread.sleep(3000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

         可以看到,我们真正的业务代码是被包含于一个try中的,在业务之前和之后都有一段锁逻辑,因此,aop通知我们选择环绕通知。同时,在创建锁的时候,我们需要一些参数,编写这个aop时我们需要考虑怎样将参数传给它。

4. aop原型

        上面提到了,我们需要为aop提供参数,然后aop通过这些参数为我们进行上锁解锁操作。在这里我们选择使用将注解设置为切点,因为注解可以完成提供参数的这一需求。这样,aop的原型呼之欲出。

    @Autowired
    private RedissonClient redissonClient;

    @Pointcut("@annotation(com.xxx.common.aop.distributed.DistributedLock)")
    public void distributedLockAspect() {}

    @Around(value = "distributedLockAspect()")
    public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
        return doLock(pjp);
    }

5. @DistributedLock(切点)

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {
    /**
     * 锁的名称。
     * 如果lockName可以确定,直接设置该属性。
     */
    String value();

    /**
     * 是否使用尝试锁。
     */
    boolean tryLock() default false;
    /**
     * 最长等待时间。
     * 该字段只有当tryLock()返回true才有效。
     */
    long waitTime() default 30L;
    /**
     * 锁超时时间。
     * 如果tryLock为false,且leaseTime设置为0及以下,会变成lock()
     */
    long leaseTime() default 5L;
    /**
     * 时间单位。默认为秒。
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}

        @Target({ ElementType.METHOD })的含义是当前注解使用在方法上 

        @Retention(RetentionPolicy.RUNTIME)的含义是当前注解在运行时有效

        注解中包含一些上锁时常用的属性,如果从aop中读取这些属性,也可以执行上锁解锁过程,但是事实上,我们在设置锁的时候常常是需要设置 锁粒度 的。

        解释一下锁粒度的问题,我们知道,锁是为了将原本异步处理的一些功能同步,防止出现线程安全问题,但是现在这个注解实现时,只有一个value,这个value对于某个方法来说是固定的,但是这样就会存在问题,打个比方,你在超市买东西以后要结账(方法),结果超市虽然有很多个结账台,但是只允许有一个人去结账,其他人都拦在外面,因为第一个人进去结账的时候直接把门带上了(上锁),他锁住了整个方法,导致了其他人无法完成结账,这时我们正确的解决办法是让所有的结账台都能接收一个顾客,也就是我们的结账方法需要能为每个结账台构建独立的锁(其实就是行级锁和表级锁的区分)。

        我们的方法是同一个,但是我们需要让它在不同的情况下去生成不同的锁,这个时候首先想到的应该是方法参数,我们可以考虑用方法的参数的不同来决定锁的形态。这样,出现了第二个注解。

6. @DistributedLockParam

        

@Target({ ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLockParam {
    /**
     * 如果是对象则填写使用哪个属性,如果是基本数据类型或String则按默认
     * @return
     */
    String value() default "";

    /**
     * 当前属性在锁目录的顺序,越小越靠前
     * **勿重
     * @return
     */
    int sort() default 0;
}

        这个注解的作用是写在方法参数上,当我们需要设置锁粒度的时候,就在对应的参数上填写这个注解,然后在aop中读取包含这个注解的参数的值,拼在真正的锁name中。

        由于粒度可能由多于一个元素组成,所以在注解中加入了sort属性用来为粒度参数排序。

        由于方法的参数既可能是基本类型,还可能是对象,我们的粒度标记可能 除了标记基本类型外,还需要用来标记对象中的某个属性,甚至这个属性还可能是对象,就需要对象属性的属性……

        好吧,先记住这个问题,等我们到了aop逻辑再说。

        至于value中的值,在设置中,默认情况是直接使用目标的toString,如果要使用对象的某个属性,则将value修改为对应的属性名,如果是属性的属性,就用 "." 来表示。

        当然了,我们既然已经支持多个粒度标记,那么就应该允许选择某一个对象的多个属性作为粒度,这个功能有两种实现方式,第一种是在value值中进行切分,比如 "a;b" 表示用a属性做第一个标记,用b属性做第二个标记。

        但是万一我想在这两个之间拼接一个其他对象的某个属性呢?(没有这个万一!guna!),另外我们的value已经允许使用 "." 做深度查询标记了,再加一个分号是不是太乱了。

        所以我们放弃了这种实现方式,换成另一种,复用注解。

        注解怎么复用我就不说了,这是一个固定流程,在这里只贴出其中的代码。

        需要在@DistributedLockParam中加入一个新注解@Repeatable。

@Target({ ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(DistributedLockParams.class)
public @interface DistributedLockParam {
    /**
     * 如果是对象则填写使用哪个属性,如果是基本数据类型或String则按默认
     * @return
     */
    String value() default "";

    /**
     * 当前属性在锁目录的顺序,越小越靠前
     * **勿重
     * @return
     */
    int sort() default 0;
}

        然后是需要一个额外的注解作为它的容器。

@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target({ ElementType.PARAMETER })
public @interface DistributedLockParams {
    DistributedLockParam[] value();
}

        至此,我们用到的三个注解就全部完成了。

        然后,坐稳了,我们要加速了。

7. aop逻辑

        在之前aop原型中,我们的环绕通知调用了doLock方法。

private Object doLock(ProceedingJoinPoint pjp) throws Throwable {
        //切点所在的类
        Class targetClass = pjp.getTarget().getClass();
        //使用了注解的方法
        String methodName = pjp.getSignature().getName();
        Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes();
        Method method = targetClass.getMethod(methodName, parameterTypes);
        Object[] arguments = pjp.getArgs();
        // 根据方法反射获取想要的锁名字
        String lockName = getLockName(method, arguments);
        DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
        // 生成锁
        RLock lock = lock(lockName, distributedLock);
        try {
            return pjp.proceed();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

        其中,我们要做的第一步是获取到连接点的方法和参数,这是我们redisson锁的参数存放位置,这部分代码比较固定。

        得到这两个元素后,我们可以用getLockName方法拼接锁的name。

        

private String getLockName(Method method, Object[] arguments) {
        // 获取注解
        DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
        // 获取前缀
        StringBuilder lockName = new StringBuilder(distributedLock.value());
        // 用来存储锁粒度标记
        TreeMap<Integer, String> treeMap = new TreeMap<>();
        // 遍历参数找到粒度标记
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        for (int i = 0; i < parameterAnnotations.length; i++) {
            // parameterAnnotations[i]:第i个参数的注解数组
            for (Annotation annotation : parameterAnnotations[i]) {
                // 遍历需要的注解(如果只有一个就是DistributedLockParam,如果一个参数上有多个会自动组合成DistributedLockParams)
                if (annotation instanceof DistributedLockParam) {
                    // 获取注解
                    DistributedLockParam distributedLockParam = (DistributedLockParam) annotation;
                    // 把属性放进treemap
                    fillTreeMap(distributedLockParam, arguments[i], treeMap);
                }else if (annotation instanceof DistributedLockParams) {
                    // 获取注解
                    DistributedLockParams distributedLockParams = (DistributedLockParams) annotation;
                    // 把属性放进treemap
                    for (DistributedLockParam distributedLockParam : distributedLockParams.value()) {
                        fillTreeMap(distributedLockParam, arguments[i], treeMap);
                    }
                    break;
                }
            }
        }
        // 收集完毕,拼接lockName
        separate(lockName, treeMap);
        return lockName.toString();
    }

    private void fillTreeMap(DistributedLockParam distributedLockParam, Object argument, TreeMap<Integer, String> treeMap) {
        // 获取属性名
        String field = distributedLockParam.value();
        if (field.equals("")) {
            // 基本属性直接用
            field = argument.toString();
        } else {
            // 对象反射拿数据
            try {
                String[] values = field.split("\\.");
                for (int i = 0; i < values.length; i++) {
                    Field declaredField = argument.getClass().getDeclaredField(values[i]);
                    declaredField.setAccessible(true);
                    if (i == values.length - 1) {
                        // 最后一个为真实对象,此时从中提取属性
                        field = declaredField.get(argument).toString();
                        // 如果这里不跳出,下一句执行会报错
                        break;
                    }
                    // 切换到下级对象
                    argument = declaredField.get(argument);
                }

            } catch (NoSuchFieldException | IllegalAccessException e) {
                throw new RRException("分布式锁参数有误");
            }
        }
        // 确定好以后放进treeMap中,自动排序
        treeMap.put(distributedLockParam.sort(), field);
    }

    private void separate(StringBuilder lockName, TreeMap<Integer, String> treeMap) {
        treeMap.values().forEach(s -> lockName.append(":").append(s));
    }

        首先,从method上获取@DistributedLock注解,并获取其中的value,这是我们锁名的前缀,如果后续没有拼接操作的话,它就是我们的锁名。

        第二步是获取全部的粒度标记,因为有排序功能,索引我们引入treeMap来为我们找到的标记排序。

        

Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        for (int i = 0; i < parameterAnnotations.length; i++) {
            // parameterAnnotations[i]:第i个参数的注解数组
            for (Annotation annotation : parameterAnnotations[i]) {
                // 遍历需要的注解(如果只有一个就是DistributedLockParam,如果一个参数上有多个会自动组合成DistributedLockParams)
                if (annotation instanceof DistributedLockParam) {
                    // 获取注解
                    DistributedLockParam distributedLockParam = (DistributedLockParam) annotation;
                    // 把属性放进treemap
                    fillTreeMap(distributedLockParam, arguments[i], treeMap);
                }else if (annotation instanceof DistributedLockParams) {
                    // 获取注解
                    DistributedLockParams distributedLockParams = (DistributedLockParams) annotation;
                    // 把属性放进treemap
                    for (DistributedLockParam distributedLockParam : distributedLockParams.value()) {
                        fillTreeMap(distributedLockParam, arguments[i], treeMap);
                    }
                    break;
                }
            }
        }

        其中的这段代码是获取标记的全过程。首先我们会获取参数列表的注解数组,它是一个二维数组,两个index分别表示参数的索引和注解的索引。

        我们需要遍历这个数组中的每一个注解,判断它是不是我们需要的@DistributedLockParam和@DistributedLockParams(当我们的注解没有复用的时候,注解就是@DistributedLockParam,但是当我们对这个注解进行复用的时候,我们取到的就是@DistributedLockParams,其中包含所有的@DistributedLockParam)。

        获取到注解以后通过fillTreeMap()方法提取其中的数据,对value默认情况下会直接提取目标的toString,如果我们手写了value……

            // 对象反射拿数据
            try {
                String[] values = field.split("\\.");
                for (int i = 0; i < values.length; i++) {
                    Field declaredField = argument.getClass().getDeclaredField(values[i]);
                    declaredField.setAccessible(true);
                    if (i == values.length - 1) {
                        // 最后一个为真实对象,此时从中提取属性
                        field = declaredField.get(argument).toString();
                        // 如果这里不跳出,下一句执行会报错
                        break;
                    }
                    // 切换到下级对象
                    argument = declaredField.get(argument);
                }

        for递归的受难日到了。

        首先我们按照约定的规则切分value,得到了每级属性的名字,然后就需要在反射的层面来找到这个属性了。

        一开始,argument是我们的参数本身,通过argument.getClass().getDeclaredField(values[i])可以获得当前我们需要的属性,如果这个时候我们已经探索到最后一级了,就直接将这个属性的存在treeMap中并跳出(递归头),反之,我们需要以这个属性为基础,再次探索(递归体),在递归时,我们的argument实际指代的是当前遍历到的对象,所以在进入下一次递归前,我们首先需要将argument指向我们的下级属性对象。

        当上面的逻辑执行完后,我们就得到了一个treeMap,其中包含我们所有的标记,并已经按照sort做了排序。

        然后做一步简单的拼接。

private void separate(StringBuilder lockName, TreeMap<Integer, String> treeMap) {
        treeMap.values().forEach(s -> lockName.append(":").append(s));
    }

        拼接用":"是因为冒号在redis中是目录展示,类似于我们的"/"。

        后面的操作就很简单了。

RLock lock = lock(lockName, distributedLock);
        try {
            return pjp.proceed();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
private RLock lock(String lockName, DistributedLock distributedLock) throws InterruptedException {
        RLock lock = redissonClient.getLock(lockName);
        // 上锁
        if (distributedLock.tryLock()) {
                lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
        } else {
            long leaseTime = distributedLock.leaseTime();
            if (leaseTime > 0) {
                lock.lock(distributedLock.leaseTime(), distributedLock.timeUnit());
            } else {
                lock.lock();
            }
        }
        return lock;
    }

        提取@DistributedLock中的其他锁参数,然后构建我们的lock并执行上锁,try 返回结果finally解锁一气呵成。

8. 一些锁失效情况

        在这个锁中,锁失效也就意味着是aop失效,所以这个问题可以变成,在什么情况下aop会失效。

        这里就涉及到aop的原理,我只大致描述一点,有兴趣的可以自己去搜一下。

        aop是基于代理模式的,Spring的aop会在我们调用 由Bean容器管理的对象 中的方法时自动触发,所以有两种情况下是无法生效的。

  1. 代理对象无法访问方法:当我们的方法由final修饰,或者为private方法时,代理是无法实现的。
  2. 当我们没有通过Bean容器调用(没有从上下文获取,并且也不是注入)时:最常见的情况就是非注解方法直接调用本类的注解方法,此时注解方法的注解是不会生效的。

9. 完整代码

// spring相关的依赖就不贴过来了
<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.12.0</version>
        </dependency>
import com.xxx.common.aop.distributed.DistributedLock;
import com.xxx.common.aop.distributed.DistributedLockParam;
import com.xxx.common.aop.distributed.DistributedLockParams;
import com.xxx.common.exception.RRException;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.TreeMap;

@Aspect
@Component
public class DistributedLockAspect {

    @Autowired
    private RedissonClient redissonClient;

    @Pointcut("@annotation(com.xxx.common.aop.distributed.DistributedLock)")
    public void distributedLockAspect() {}

    @Around(value = "distributedLockAspect()")
    public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
        return doLock(pjp);
    }

    private Object doLock(ProceedingJoinPoint pjp) throws Throwable {
        //切点所在的类
        Class targetClass = pjp.getTarget().getClass();
        //使用了注解的方法
        String methodName = pjp.getSignature().getName();
        Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes();
        Method method = targetClass.getMethod(methodName, parameterTypes);
        Object[] arguments = pjp.getArgs();
        // 根据方法反射获取想要的锁名字
        String lockName = getLockName(method, arguments);
        DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
        // 生成锁
        RLock lock = lock(lockName, distributedLock);
        try {
            return pjp.proceed();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    private RLock lock(String lockName, DistributedLock distributedLock) throws InterruptedException {
        RLock lock = redissonClient.getLock(lockName);
        // 上锁
        if (distributedLock.tryLock()) {
                lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
        } else {
            long leaseTime = distributedLock.leaseTime();
            if (leaseTime > 0) {
                lock.lock(distributedLock.leaseTime(), distributedLock.timeUnit());
            } else {
                lock.lock();
            }
        }
        return lock;
    }

    private String getLockName(Method method, Object[] arguments) {
        // 获取注解
        DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
        // 获取前缀
        StringBuilder lockName = new StringBuilder(distributedLock.value());
        // 用来存储锁粒度标记
        TreeMap<Integer, String> treeMap = new TreeMap<>();
        // 遍历参数找到粒度标记
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        for (int i = 0; i < parameterAnnotations.length; i++) {
            // parameterAnnotations[i]:第i个参数的注解数组
            for (Annotation annotation : parameterAnnotations[i]) {
                // 遍历需要的注解(如果只有一个就是DistributedLockParam,如果一个参数上有多个会自动组合成DistributedLockParams)
                if (annotation instanceof DistributedLockParam) {
                    // 获取注解
                    DistributedLockParam distributedLockParam = (DistributedLockParam) annotation;
                    // 把属性放进treemap
                    fillTreeMap(distributedLockParam, arguments[i], treeMap);
                }else if (annotation instanceof DistributedLockParams) {
                    // 获取注解
                    DistributedLockParams distributedLockParams = (DistributedLockParams) annotation;
                    // 把属性放进treemap
                    for (DistributedLockParam distributedLockParam : distributedLockParams.value()) {
                        fillTreeMap(distributedLockParam, arguments[i], treeMap);
                    }
                    break;
                }
            }
        }
        // 收集完毕,拼接lockName
        separate(lockName, treeMap);
        return lockName.toString();
    }

    private void fillTreeMap(DistributedLockParam distributedLockParam, Object argument, TreeMap<Integer, String> treeMap) {
        // 获取属性名
        String field = distributedLockParam.value();
        if (field.equals("")) {
            // 基本属性直接用
            field = argument.toString();
        } else {
            // 对象反射拿数据
            try {
                String[] values = field.split("\\.");
                for (int i = 0; i < values.length; i++) {
                    Field declaredField = argument.getClass().getDeclaredField(values[i]);
                    declaredField.setAccessible(true);
                    if (i == values.length - 1) {
                        // 最后一个为真实对象,此时从中提取属性
                        field = declaredField.get(argument).toString();
                        // 如果这里不跳出,下一句执行会报错
                        break;
                    }
                    // 切换到下级对象
                    argument = declaredField.get(argument);
                }

            } catch (NoSuchFieldException | IllegalAccessException e) {
                throw new RRException("分布式锁参数有误");
            }
        }
        // 确定好以后放进treeMap中,自动排序
        treeMap.put(distributedLockParam.sort(), field);
    }

    private void separate(StringBuilder lockName, TreeMap<Integer, String> treeMap) {
        treeMap.values().forEach(s -> lockName.append(":").append(s));
    }

    @AfterThrowing(value = "distributedLockAspect()", throwing="ex")
    public void afterThrowing(Throwable ex) {
        throw new RuntimeException(ex);
    }

}
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {
    /**
     * 锁的名称。
     * 如果lockName可以确定,直接设置该属性。
     */
    String value();

    /**
     * 是否使用尝试锁。
     */
    boolean tryLock() default false;
    /**
     * 最长等待时间。
     * 该字段只有当tryLock()返回true才有效。
     */
    long waitTime() default 30L;
    /**
     * 锁超时时间。
     * 如果tryLock为false,且leaseTime设置为0及以下,会变成lock()
     */
    long leaseTime() default 5L;
    /**
     * 时间单位。默认为秒。
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}
import java.lang.annotation.*;

@Target({ ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(DistributedLockParams.class)
public @interface DistributedLockParam {
    /**
     * 如果是对象则填写使用哪个属性,如果是基本数据类型或String则按默认
     * @return
     */
    String value() default "";

    /**
     * 当前属性在锁目录的顺序,越小越靠前
     * **勿重
     * @return
     */
    int sort() default 0;
}
import java.lang.annotation.*;

@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target({ ElementType.PARAMETER })
public @interface DistributedLockParams {
    DistributedLockParam[] value();
}

上一篇:redux


下一篇:学习笔记|Generator 函数的异步应用