学习是一个循序渐进的过程,在学习现场池相关知识的时候,注意到线程池使用了阻塞队列,阻塞队列LinkedBlockingQueue中又使用到了ReentrantLock可重入锁,因此先对ReentrantLock展开学习。
ReentrantLock中很重要的一部分是Sync,他继承了AQS(AbstractQueuedSynchronizer),AQS提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(信号量,事件等)。此类被设计为*是大多数类型的同步器的有用基础,这些同步器依赖于单个原子 int值来表示状态。子类必须定义更改此状态的受保护方法,以及*根据要获取或释放的此对象定义该状态的含义。鉴于这些,这个类中的其他方法带有out所有排队和阻塞机制。子类可以维护其他状态字段,但仅使用方法getState,setState和compareAndSetState操作的原子更新的 int值被跟踪同步。
AQS是一种基于CAS(Compare And Swap)的并发算法,CAS是一种有名的无锁(lock-free)算法。也是一种现代 CPU 广泛支持的CPU指令级的操作,只有一步原子操作,所以非常快。而且CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了。
参考链接:https://segmentfault.com/a/1190000015239603
此文在学习过程中参考了@活在夢裡 的文章https://www.cnblogs.com/micrari/p/6937995.html。强烈推荐阅读该作者的文章,对于如何正确学习和理解AQS,需要考虑哪些问题,作者都给出了明确的方法。以下文章部分引用了原作者的内容,加上自己的分析。
AQS则继承了AOS(AbstractOwnableSynchronizer),AOS中维护着一个独占线程,提供了基础的get/set方法。
AQS中的内部类Node等待队列节点类。
* +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+
1 /** Marker to indicate a node is waiting in exclusive mode*/ 2 static final Node EXCLUSIVE = null; 3 4 /** waitStatus值表示线程已取消. */ 5 static final int CANCELLED = 1; 6 /**waitStatus值表示后继者的线程需要取消停放,一般由后续结点进行设置,当他之前的结点为阻塞状态,则他为阻塞状态 */ 7 static final int SIGNAL = -1; 8 /**waitStatus值表示线程正在等待. */ 9 static final int CONDITION = -2; 10 /** 11 * waitStatus值表示下一个acquireShared应 12 *无条件传播。 13 */ 14 static final int PROPAGATE = -3;Node
以上的状态是节点的重要组成部分,后续将根据结点状态进行判断结点是否进入同步队列。
AQS的精妙之处就在于通过状态来进行线程的管理。接下来通过获得锁来展开AQS的具体实现。
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
先尝试获得锁,如果获得锁失败则封装为Node放入同步队列。
1 /** 2 * 为当前线程和给定模式创建排队节点。 3 */ 4 private Node addWaiter(Node mode) { 5 Node node = new Node(mode); 6 7 for (;;) { 8 //如果不是新队列则将结点赋给尾结点 9 Node oldTail = tail; 10 if (oldTail != null) { 11 node.setPrevRelaxed(oldTail); 12 if (compareAndSetTail(oldTail, node)) { 13 oldTail.next = node; 14 return node; 15 } 16 } else { 17 initializeSyncQueue(); 18 } 19 } 20 }
对特殊情况的头部结点与尾部结点处理
1 /** 2 * 在第一次争用时初始化头部和尾部字段。 3 */ 4 private final void initializeSyncQueue() { 5 Node h; 6 if (HEAD.compareAndSet(this, null, (h = new Node()))) 7 tail = h; 8 } 9 10 /** 11 * CAS处理尾部结点 12 */ 13 private final boolean compareAndSetTail(Node expect, Node update) { 14 return TAIL.compareAndSet(this, expect, update); 15 }
acquireQueued
1 /** 2 * 在队列中的节点通过此方法获取锁。 3 * 4 */ 5 final boolean acquireQueued(final Node node, int arg) { 6 boolean interrupted = false; 7 try { 8 for (;;) { 9 final Node p = node.predecessor(); 10 //如果结点是头结点则尝试获得锁 11 //尝试成功则获得锁 12 //尝试失败则继续等待 13 if (p == head && tryAcquire(arg)) { 14 setHead(node); 15 p.next = null; // help GC 16 return interrupted; 17 } 18 if (shouldParkAfterFailedAcquire(p, node)) 19 interrupted |= parkAndCheckInterrupt(); 20 } 21 } catch (Throwable t) { 22 cancelAcquire(node); 23 if (interrupted) 24 selfInterrupt(); 25 throw t; 26 } 27 }
shouldParkAfterFailedAcquire
1 /** 2 * 检查并更新无法获取的节点的状态。 如果线程应该阻塞,则返回 3 * true。 4 */ 5 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 6 int ws = pred.waitStatus; 7 if (ws == Node.SIGNAL) 8 /* 9 * 如果结点的waitStatus是SINGAL则返回true 10 */ 11 return true; 12 if (ws > 0) { 13 /* 14 * 前结点状态CANCEL。跳过前结点,直至前结点状态不为取消 15 */ 16 do { 17 node.prev = pred = pred.prev; 18 } while (pred.waitStatus > 0); 19 pred.next = node; 20 } else { 21 /* 22 * waitStatus必须为0或PROPAGATE(-3)。 设置前结点的等待状态为SIGNAL 23 * 24 */ 25 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); 26 } 27 return false; 28 }
阻塞线程。通过LockSupport进行线程的管理
1 /** 2 * 此处使用LockSupport的park方法进行线程管理。 3 * 除非条件允许,否则禁用当前线程以进行线程调度。 4 * 此部分在LockSupport中展开描述,在此不过多赘述 5 */ 6 private final boolean parkAndCheckInterrupt() { 7 LockSupport.park(this); 8 return Thread.interrupted(); 9 }
取消正在进行的尝试获取锁的结点。
1 /** 2 * 取消一个正在尝试获得锁的结点的尝试 3 * 4 */ 5 private void cancelAcquire(Node node) { 6 // 如果结点为空则忽略 7 if (node == null) 8 return; 9 10 node.thread = null; 11 12 // 跳过状态是取消状态的前驱结点 13 Node pred = node.prev; 14 while (pred.waitStatus > 0) 15 node.prev = pred = pred.prev; 16 17 // 因为我们无法处理node是head的情况,所以需要交给CAS进行 18 // 处理,此处保存一份前驱结点的next结点 19 Node predNext = pred.next; 20 21 // 设置node的状态为取消状态 22 node.waitStatus = Node.CANCELLED; 23 24 // 如果node结点是tail,尝试快速设置,若快速设置删除自己 25 if (node == tail && compareAndSetTail(node, pred)) { 26 pred.compareAndSetNext(predNext, null); 27 } else { 28 // 如果前结点不为head,且状态为阻塞状态或者status<0的其他情况 29 // 并且前结点的thread属性不为空 30 // 设置前驱结点的状态为阻塞状态 31 // 否则unparkSuccessor唤醒当前结点 32 int ws; 33 if (pred != head && 34 ((ws = pred.waitStatus) == Node.SIGNAL || 35 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && 36 pred.thread != null) { 37 Node next = node.next; 38 if (next != null && next.waitStatus <= 0) 39 pred.compareAndSetNext(predNext, next); 40 } else { 41 unparkSuccessor(node); 42 } 43 44 node.next = node; // help GC 45 } 46 }
如果CAS将tail从node置为pred节点了 ,则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。
在前一个状态不为取消的结点上进行判断,如果结点状态为阻塞,则设置切断当前结点与前驱结点的联系,并将当前结点的后继结点作为前驱结点的后继结点。
如果状态为-2或者-3,则设置前驱节点状态为阻塞状态。
若是head(此处不可能为取消状态),则唤醒线程。
1 /** 2 * 唤醒线程,如果结点不为null 3 * 4 * @param node the node 5 */ 6 private void unparkSuccessor(Node node) { 7 /* 8 * 将node的状态设置为无状态,进行再一次的竞争,因为此时的 9 * node可能不是tail 10 */ 11 int ws = node.waitStatus; 12 if (ws < 0) 13 node.compareAndSetWaitStatus(ws, 0); 14 15 /* 16 * unpark的线程在后继中保存,通常只是下一个节点。但如果取消 17 * 或者显然为空, 18 * 从尾部向后遍历以找到实际的*未取消的继承者。 19 */ 20 Node s = node.next; 21 if (s == null || s.waitStatus > 0) { 22 s = null; 23 for (Node p = tail; p != node && p != null; p = p.prev) 24 if (p.waitStatus <= 0) 25 s = p; 26 } 27 if (s != null) 28 LockSupport.unpark(s.thread); 29 }
此处参考了@活在夢裡的文章。引用他的一段描述https://www.cnblogs.com/micrari/p/6937995.html
/* * 这里的逻辑就是如果node.next存在并且状态不为取消,则直接唤醒s即可 * 否则需要从tail开始向前找到node之后最近的非取消节点。 * * 这里为什么要从tail开始向前查找也是值得琢磨的: * 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。 * 不妨考虑到如下场景: * 1. node某时刻为tail * 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己 * 3. compareAndSetTail成功 * 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了! */