AbstractQueuedSynchronizer(AQS)源码分析

  学习是一个循序渐进的过程,在学习现场池相关知识的时候,注意到线程池使用了阻塞队列,阻塞队列LinkedBlockingQueue中又使用到了ReentrantLock可重入锁,因此先对ReentrantLock展开学习。

  ReentrantLock中很重要的一部分是Sync,他继承了AQS(AbstractQueuedSynchronizer),AQS提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(信号量,事件等)。此类被设计为*是大多数类型的同步器的有用基础,这些同步器依赖于单个原子 int值来表示状态。子类必须定义更改此状态的受保护方法,以及*根据要获取或释放的此对象定义该状态的含义。鉴于这些,这个类中的其他方法带有out所有排队和阻塞机制。子类可以维护其他状态字段,但仅使用方法getState,setState和compareAndSetState操作的原子更新的 int值被跟踪同步。

  AQS是一种基于CAS(Compare And Swap)的并发算法,CAS是一种有名的无锁(lock-free)算法。也是一种现代 CPU 广泛支持的CPU指令级的操作,只有一步原子操作,所以非常快。而且CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了。  

  参考链接:https://segmentfault.com/a/1190000015239603

  此文在学习过程中参考了@活在夢裡 的文章https://www.cnblogs.com/micrari/p/6937995.html。强烈推荐阅读该作者的文章,对于如何正确学习和理解AQS,需要考虑哪些问题,作者都给出了明确的方法。以下文章部分引用了原作者的内容,加上自己的分析。

  AQS则继承了AOS(AbstractOwnableSynchronizer),AOS中维护着一个独占线程,提供了基础的get/set方法。

  AQS中的内部类Node等待队列节点类。

     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+

  

AbstractQueuedSynchronizer(AQS)源码分析
 1         /** Marker to indicate a node is waiting in exclusive mode*/
 2         static final Node EXCLUSIVE = null;
 3 
 4         /** waitStatus值表示线程已取消. */
 5         static final int CANCELLED =  1;
 6         /**waitStatus值表示后继者的线程需要取消停放,一般由后续结点进行设置,当他之前的结点为阻塞状态,则他为阻塞状态 */
 7         static final int SIGNAL    = -1;
 8         /**waitStatus值表示线程正在等待. */
 9         static final int CONDITION = -2;
10         /**
11          * waitStatus值表示下一个acquireShared应
12          *无条件传播。
13          */
14         static final int PROPAGATE = -3;    
Node

  以上的状态是节点的重要组成部分,后续将根据结点状态进行判断结点是否进入同步队列。

  AQS的精妙之处就在于通过状态来进行线程的管理。接下来通过获得锁来展开AQS的具体实现。

  

1   public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

  先尝试获得锁,如果获得锁失败则封装为Node放入同步队列。

 1     /**
 2      *  为当前线程和给定模式创建排队节点。
 3      */
 4     private Node addWaiter(Node mode) {
 5         Node node = new Node(mode);
 6 
 7         for (;;) {
 8             //如果不是新队列则将结点赋给尾结点
 9             Node oldTail = tail;
10             if (oldTail != null) {
11                 node.setPrevRelaxed(oldTail);
12                 if (compareAndSetTail(oldTail, node)) {
13                     oldTail.next = node;
14                     return node;
15                 }
16             } else {
17                 initializeSyncQueue();
18             }
19         }
20     }

  对特殊情况的头部结点与尾部结点处理

 1   /**
 2      * 在第一次争用时初始化头部和尾部字段。
 3      */
 4     private final void initializeSyncQueue() {
 5         Node h;
 6         if (HEAD.compareAndSet(this, null, (h = new Node())))
 7             tail = h;
 8     }
 9 
10     /**
11      * CAS处理尾部结点
12      */
13     private final boolean compareAndSetTail(Node expect, Node update) {
14         return TAIL.compareAndSet(this, expect, update);
15     }

  acquireQueued

 1      /**
 2      * 在队列中的节点通过此方法获取锁。
 3      * 
 4      */
 5     final boolean acquireQueued(final Node node, int arg) {
 6         boolean interrupted = false;
 7         try {
 8             for (;;) {
 9                 final Node p = node.predecessor();
10                 //如果结点是头结点则尝试获得锁
11                 //尝试成功则获得锁
12                 //尝试失败则继续等待 
13                 if (p == head && tryAcquire(arg)) {
14                     setHead(node);
15                     p.next = null; // help GC
16                     return interrupted;
17                 }
18                 if (shouldParkAfterFailedAcquire(p, node))
19                     interrupted |= parkAndCheckInterrupt();
20             }
21         } catch (Throwable t) {
22             cancelAcquire(node);
23             if (interrupted)
24                 selfInterrupt();
25             throw t;
26         }
27     }    

  shouldParkAfterFailedAcquire

 1     /**
 2      * 检查并更新无法获取的节点的状态。 如果线程应该阻塞,则返回    
 3      * true。
 4      */
 5     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 6         int ws = pred.waitStatus;
 7         if (ws == Node.SIGNAL)
 8             /*
 9              * 如果结点的waitStatus是SINGAL则返回true
10              */
11             return true;
12         if (ws > 0) {
13             /*
14              * 前结点状态CANCEL。跳过前结点,直至前结点状态不为取消   
15              */
16             do {
17                 node.prev = pred = pred.prev;
18             } while (pred.waitStatus > 0);
19             pred.next = node;
20         } else {
21             /*
22              *  waitStatus必须为0或PROPAGATE(-3)。 设置前结点的等待状态为SIGNAL
23              *  
24              */
25             pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
26         }
27         return false;
28     }    

   阻塞线程。通过LockSupport进行线程的管理

1      /**
2      * 此处使用LockSupport的park方法进行线程管理。
3      * 除非条件允许,否则禁用当前线程以进行线程调度。
4      * 此部分在LockSupport中展开描述,在此不过多赘述
5      */
6     private final boolean parkAndCheckInterrupt() {
7         LockSupport.park(this);
8         return Thread.interrupted();
9     }
  取消正在进行的尝试获取锁的结点。
 1     /**
 2      *  取消一个正在尝试获得锁的结点的尝试
 3      *
 4      */
 5     private void cancelAcquire(Node node) {
 6         // 如果结点为空则忽略
 7         if (node == null)
 8             return;
 9 
10         node.thread = null;
11 
12         // 跳过状态是取消状态的前驱结点
13         Node pred = node.prev;
14         while (pred.waitStatus > 0)
15             node.prev = pred = pred.prev;
16  
17         // 因为我们无法处理node是head的情况,所以需要交给CAS进行   
18         // 处理,此处保存一份前驱结点的next结点
19         Node predNext = pred.next;
20 
21         // 设置node的状态为取消状态
22         node.waitStatus = Node.CANCELLED;
23 
24         // 如果node结点是tail,尝试快速设置,若快速设置删除自己
25         if (node == tail && compareAndSetTail(node, pred)) {
26             pred.compareAndSetNext(predNext, null);
27         } else {
28             // 如果前结点不为head,且状态为阻塞状态或者status<0的其他情况
29             // 并且前结点的thread属性不为空
30             // 设置前驱结点的状态为阻塞状态
31             // 否则unparkSuccessor唤醒当前结点
32             int ws;
33             if (pred != head &&
34                 ((ws = pred.waitStatus) == Node.SIGNAL ||
35                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
36                 pred.thread != null) {
37                 Node next = node.next;
38                 if (next != null && next.waitStatus <= 0)
39                     pred.compareAndSetNext(predNext, next);
40             } else {
41                 unparkSuccessor(node);
42             }
43 
44             node.next = node; // help GC
45         }
46     }

  如果CAS将tail从node置为pred节点了 ,则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。   

  在前一个状态不为取消的结点上进行判断,如果结点状态为阻塞,则设置切断当前结点与前驱结点的联系,并将当前结点的后继结点作为前驱结点的后继结点。

     如果状态为-2或者-3,则设置前驱节点状态为阻塞状态。

  若是head(此处不可能为取消状态),则唤醒线程。

 1    /**
 2      * 唤醒线程,如果结点不为null
 3      *
 4      * @param node the node
 5      */
 6     private void unparkSuccessor(Node node) {
 7         /*
 8          * 将node的状态设置为无状态,进行再一次的竞争,因为此时的 
 9          * node可能不是tail
10          */
11         int ws = node.waitStatus;
12         if (ws < 0)
13             node.compareAndSetWaitStatus(ws, 0);
14 
15         /*
16          * unpark的线程在后继中保存,通常只是下一个节点。但如果取消 
17          * 或者显然为空,
18          * 从尾部向后遍历以找到实际的*未取消的继承者。
19          */
20         Node s = node.next;
21         if (s == null || s.waitStatus > 0) {
22             s = null;
23             for (Node p = tail; p != node && p != null; p = p.prev)
24                 if (p.waitStatus <= 0)
25                     s = p;
26         }
27         if (s != null)
28             LockSupport.unpark(s.thread);
29     }

  此处参考了@活在夢裡的文章。引用他的一段描述https://www.cnblogs.com/micrari/p/6937995.html

    /*
     * 这里的逻辑就是如果node.next存在并且状态不为取消,则直接唤醒s即可
     * 否则需要从tail开始向前找到node之后最近的非取消节点。
     *
     * 这里为什么要从tail开始向前查找也是值得琢磨的:
     * 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。
     * 不妨考虑到如下场景:
     * 1. node某时刻为tail
     * 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己
     * 3. compareAndSetTail成功
     * 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了!
     */

  

 

上一篇:# XX 课堂之ReentrantLock源码学习总结 (一)


下一篇:长话短说stacking集成学习算法,保证你能看得懂(2)