从手写同步工具到了解AQS

线程同步

Java 多线程无法绕过的点就包括线程同步,线程同步就是基于程序员所想的顺序对竟态资源进行访问。我们已知的线程同步方式就有锁(synchronized,Lock)以及JUC 下的一些同步工具。


那么抛开已知的线程同步工具,我们自己是否能实现一个同步工具呢?答案是肯定的,基于AbstractQueuedSynchronizer 提供的模版方法,我们很快的就可以实现一个自己的同步工具。


DIY同步工具

首先我们创建一个自己的同步工具类SelfLock.java

public class SelfLock {
    /**
     * 构建自己的同步核心类,继承AQS
     */
    class Sync extends AbstractQueuedSynchronizer{

        /**
         * 重写该方法,该方法是我们实现同步工具的获取具体方法
         * 以下代码表示通过cas 的方式将AQS 中的state 字段设置成1
         * 这里没有设置获取锁成功的线程
         * 
         * @param arg 参数
         * @return true 成功获取锁 false 获取锁失败
         */
        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0,1);
        }

        /**
         * 重写该方法,该方法是我们实现同步工具释放锁的具体方法
         * 以下代码直接给AQS 的state 设置为0 表示解锁
         * 这里没有校验解锁的是否为当前线程
         *
         * @param arg 参数
         * @return true 成功释放锁 false 释放锁失败
         */
        @Override
        protected boolean tryRelease(int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    /**
     * 创建同步核心对象
     */
    Sync sync = new Sync();

    /**
     * 加锁方法 相当于直接调用AQS 的acquire 方法
     */
    public void lock(){
       sync.acquire(1);
    }

    /**
     * 解锁 相当于直接调用AQS 的release 方法
     */
    public void unlock(){
        sync.release(1);
    }
}

以上就是自定义实现同步工具的代码,那么我么测试一下,写个测试用例:

public class TestSelfLock {

    static int count = 0;
    static SelfLock leeLock = new SelfLock();

    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            try {
                leeLock.lock();
                for (int i = 0; i < 10000; i++) {
                    count++;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                leeLock.unlock();
            }

        };

        // 根据Java 开发手册之规定,通过线程工厂构建线程;
        MyselfTreadFactory selfLockTest = new MyselfTreadFactory("self lock test");
        Thread thread1 = selfLockTest.newThread(runnable);
        Thread thread2 = selfLockTest.newThread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(count);
    }
}

MyselfTreadFactory.java

public class MyselfTreadFactory implements ThreadFactory {

   private final String namePrefix;

   /**
    * 通过原子类保证线程顺序
    */
   private final AtomicInteger nextId = new AtomicInteger(1);

   public MyselfTreadFactory(String whatFeatureOfGroup){
      this.namePrefix = "From MyselfTreadFactory's " + whatFeatureOfGroup + "Worker-";
   }


   @Override
   public Thread newThread(Runnable r) {
      String name = namePrefix + nextId.getAndIncrement();
      Thread thread = new Thread(null,r,name,0);
      System.out.println(thread.getName());
      return thread;
   }
}

执行结果:

从手写同步工具到了解AQS


通过上面测试结果来看,几行代买就可以实现一个有效的同步工具,但是具体实现还要考虑一些细节,比如加锁时设置获取锁成功的线程,在解锁时进行校验,保证同步工具的稳定性。简单的实现只是想表达AQS 的强大,接下来我们简单了解一下AQS;


AQS(AbstractQueuedSynchronizer):

它是由DougLee大神,使用一个volatile的int类型的成员变量state 来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。它是很多上层同步实现类的基础,如:ReentrantLock、CountDownLatch、Semaphore等它们通过集成AQS实现其模板方法,然后将AQS子类作为同步组件的内部类,通常命名为Sync。


在我们自己实现的同步工具类中,state 为1表示锁已被线程占用,state 为0表示锁可以被获取,之所以用volatile 就是利用了其可见性和防止指令重排序的特性,低成本的保证state 的修改对其他线程可见及安全性。


AQS 加锁

我们的同步工具在工作时,其实调用的是AQS 的acquire 方法:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire() 方法就是我们重写的逻辑,当获取锁失败时,会将当前线程通过addWaiter方法包装成一个Node 节点加入队列,并且当参数传入acquireQueued方法,该方法会处理等待队列中的线程是该出队还是该休眠。注(该队列的头节点时一个虚拟头节点,也就是说,当队列初始化时,同时有两个线程竞争锁,有一个线程将会包装成node 节点放入队列,当前队列则有两个节点。)

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        // 通过自旋进行锁的获取
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // 说明获取到锁,将当前节点设置为虚拟头节点
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            // 进行到这儿表示没有获取到锁或者该节点不是头节点,避免浪费cpu 资源进行中断
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}


AQS 解锁

对于加锁我们分析完成后,我们则要对解锁进行了解,解锁其实就是调用了AQS 的release 方法;

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

解锁也是调用了我们自己实现工具类重写的tryRelease方法,整个方法看起来还是比较简单的,值得注意的就是h != null && h.waitStatus != 0 整个判断条件;

  • h == null 表示head 为null 则表示队列没数据 不需要唤醒中断
  • h != null 且waitStatus == 0 表示不需要唤醒
  • h != null 且waitStatus < 0 ( >0 的节点已经出队了,因为取消竞争锁了) 则表示需要唤醒

唤醒后继节点进行争取锁资源,这就完成了闭环了。


以上就是本次对AQS 的分享,有兴趣的小伙伴可以深入研究一下该源码,本人则通过源码阅读对模版设计模式有了一些理解,并且对各个方法的职责明确有了新的认识,希望大家在不断学习的过程提升自己的编码能力以及认知能力。


大家加油~



上一篇:从约定编程到理解Spring AOP


下一篇:Redis 命令基础版