目录:
- 概述
- 同步队列、条件队列
- Condition源码解析
概述
首先我们知道Condition的await()、signa()是内置锁synchronize配套的wait()及notify()的增强,可以更加细化的控制锁的唤醒条件。
那么我们这里来类比下它们之间的机制:
- 同步:内置锁的wait()必须要在synchronize代码块中才能调用,也就是必须要获取到监视器锁之后才能执行;await()与之类似,也是必须获取到锁资源后才能执行。
- 等待:调用wait()的线程会释放已经获取的锁,进入当前监视器锁的等待队列中;与之类似,await()也会释放自己的lock,进入到当前Condition的条件队列中。
- 唤醒:调用notify()会唤醒等待在该监视器锁的线程,并参与监视器锁的竞争,并在获取锁之后从wait()处恢复执行;同理,调用signal()会唤醒对应线程,并获取锁资源,在获取后会在signal()处恢复执行。
同步队列、条件队列
上面我们说到了内置锁与Condition的等待,一个是放到等待队列中,一个是放到条件队列中,那么我们这里来分别说下。
——————————————————————————————————————————————————————————————————————
1、同步队列(等待队列):
前面我们在说独占锁的获取时,所有等待的线程都会被包装成一个Node进入同步队列。
由图可知其是一个先进先出(FIFO)的同步阻塞队列,基于双向链表实现,使用prev、next来串联前驱及后继节点。
其中有一个nextWaiter属性一直没有提到过,即使是共享模式下它也只是简单的标记了一下,指向了一个空节点。
因此在同步队列中,并不会用它来串联节点。
——————————————————————————————————————————————————————————————————————
2、条件队列:
Condition同理,也会在调用await()时将其包装成一个Node放入到条件队列中,但值得注意的是这个条件队列是基于单向链表实现,以nextWaiter来标记下一个节点的指针。
与同步队列恰好相反,条件队列并不会使用prev、next来串联链表,而是使用nextWaiter。
其结构大致如下:
你还记得AQS中Node的waitStatus的值嘛,没错有一个CONDITION就是来标记同步队列的,在源码中我会详述。
——————————————————————————————————————————————————————————————————————
3、同步队列与条件队列的联系:
一般情况下同步队列和条件队列是互相独立,没有任何关系的。但当调用某个条件队列中的signal()去唤醒线程时,会让这个线程和普通线程一样去竞争锁,如果没有抢到就会被加到同步队列中去,此时这个条件队列就会变成同步队列了。
这里你可能会有一个疑问,为啥条件队列获取不到锁后不是重新回到条件队列,而是加入到同步队列去了呢,这不是有问题嘛。
其实是没有问题的,因为你是满足条件后才会调用signal()唤醒线程的,所以线程尝试获取锁的时候其实已经是满足唤醒条件了,就算未获取到锁也只是重新入队(同步队列)而已,照样可以在下次被唤醒时重新竞争锁。
这里我们需要注意的是Node节点是一个一个转移过去的,即使是调用signalAll()也是一样,而不是将整个条件队列接在同步队列的队尾。
那为什么是一个一个的转移呢,因为同步队列使用的是prev、next来串联链表,条件队列使用的是nextWaiter,它们在数据结构上是完全独立的。因此我们在转换的过程中需要把nextWaiter断开,建立新的prev、next,这也就是为啥要一个个转移的原因之一了。
——————————————————————————————————————————————————————————————————————
4、入队和出队的锁状态:
- 条件队列:入队前有锁 >>> 队列中释放锁 >>> 离开队列竞争锁 >>> 转到同步队列。
- 同步队列:入队前无锁 >>> 队列中竞争锁 >>> 离开队列持有锁。
好了,通过上面的介绍相信你对条件对口已经有个一个感性的认知,接下来我们来说说它的源码。
Condition源码解析
1、基本属性及构造函数:
前面我们说到Condition接口的实现类就是ConditionObject,在学习其主要逻辑前我们还是照常先看下其基本属性及构造函数吧。
1 /** 条件队列队头节点 */ 2 private transient Node firstWaiter; 3 /** 条件队列队尾节点 */ 4 private transient Node lastWaiter; 5 6 /** 7 * 空的构造器,没啥好说的,但可以看出条件队列是延时初始化的。 8 */ 9 public ConditionObject() { }
2、await():
然后我们来看下Condition的线程阻塞方法await()。
1 public final void await() throws InterruptedException { 2 // 若当前线程已经中断则抛出异常 3 if (Thread.interrupted()) 4 throw new InterruptedException(); 5 // 将当前线程封装成Node添加到条件队列中 6 Node node = addConditionWaiter(); 7 // 释放当前线程所占用的锁,并保存当前锁的状态 8 int savedState = fullyRelease(node); 9 int interruptMode = 0; 10 // 若当前线程不在同步队列中,说明刚刚被await了(也就是条件队列),但还未调用signal()方法,则直接循环阻塞线程 11 while (!isOnSyncQueue(node)) { 12 // 阻塞线程,停止运行 13 LockSupport.park(this); 14 // 执行到这说明,要么已经调用signal()方法,要么是线程中断了 15 // 所以这里检查下唤醒原因,如果是因中断而唤醒,则跳出循环 16 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 17 break; 18 } 19 // 这部分非主流程,之后再解析 20 /*if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 21 interruptMode = REINTERRUPT; 22 if (node.nextWaiter != null) // clean up if cancelled 23 unlinkCancelledWaiters(); 24 if (interruptMode != 0) 25 reportInterruptAfterWait(interruptMode);*/ 26 }
上面已经把await()的主要流程说明了下,其核心步骤如下,接下来我会一一解析:
- 将当前线程封装成Node添加到条件队列中
- 释放当前线程所占用的锁,并保存当前锁的状态
- 循环阻塞线程
——————————————————————————————————————————————————————————————————————
A、将当前线程封装成Node添加到条件队列中:addConditionWaiter()
1 private Node addConditionWaiter() { 2 // 获取队尾队列 3 Node t = lastWaiter; 4 // 如果队尾队列不是CONDITION状态,则遍历整个队列,清除所有被cancel的节点 5 if (t != null && t.waitStatus != Node.CONDITION) { 6 // 清除所有已经取消等待的线程 7 unlinkCancelledWaiters(); 8 t = lastWaiter; 9 } 10 // 将当前队列包装成CONDITION的Node,并传入条件队列 11 Node node = new Node(Thread.currentThread(), Node.CONDITION); 12 if (t == null) 13 // 若队尾为空,说明队列还无节点,此时将当前节点设置为头节点 14 firstWaiter = node; 15 else 16 // 队尾非空,将队尾节点的nextWaiter指针指向当前节点,也就是当当前节点入到队尾 17 t.nextWaiter = node; 18 lastWaiter = node; 19 return node; 20 }
这一步非常简单,而unlinkCancelledWaiters()也不过是对链表的操作,它剔除了队列中状态不为CONDITION的节点,只要了解链表的话可以非常轻松的读懂,我就不再赘述。
1 private void unlinkCancelledWaiters() { 2 Node t = firstWaiter; 3 Node trail = null; 4 while (t != null) { 5 Node next = t.nextWaiter; 6 if (t.waitStatus != Node.CONDITION) { 7 t.nextWaiter = null; 8 if (trail == null) 9 firstWaiter = next; 10 else 11 trail.nextWaiter = next; 12 if (next == null) 13 lastWaiter = trail; 14 } 15 else 16 trail = t; 17 t = next; 18 } 19 }
这里有一个值得注意的问题,就是是否存在两个不同的线程同时入队的情况。
答案是否定的,因为入队前是一定获取到锁的,所以不存在并发的情况,不需要CAS操作。
——————————————————————————————————————————————————————————————————————
B、释放当前线程所占用的锁,并保存当前锁的状态:fullyRelease(node)
1 final int fullyRelease(Node node) { 2 boolean failed = true; 3 try { 4 int savedState = getState(); 5 if (release(savedState)) { 6 failed = false; 7 return savedState; 8 } else { 9 throw new IllegalMonitorStateException(); 10 } 11 } finally { 12 if (failed) 13 node.waitStatus = Node.CANCELLED; 14 } 15 }
首先我们要知道,当调用fullyRelease()方法时说明线程已经入队了。然后根据第5行代码可以看出,其是根据release()方法来释放锁的,这个之前介绍过就不再赘述。
但有一点奇怪的是,明明只是释放当前线程占用的锁,那为啥函数名却叫fullyRelease呢,这是因为对于可重入锁来说,无论重入了所少次,都是一次性释放完的。
这个地方有一点需要注意下,就是调用release方法是可能会抛出IllegalMonitorStateException异常,这是因为当前线程可能并不是持有锁的线程。
可之前不是说调用await方法的线程一定是持有锁的嘛,哈哈话虽这样说,但其实await方法是并没有校验的,而是放到了fullyRelease()的tryRelease()来校验Thread.currentThread() == getExclusiveOwnerThread()。
如ReentrantLock:
1 protected final boolean tryRelease(int releases) { 2 int c = getState() - releases; 3 if (Thread.currentThread() != getExclusiveOwnerThread()) 4 throw new IllegalMonitorStateException(); 5 boolean free = false; 6 if (c == 0) { 7 free = true; 8 setExclusiveOwnerThread(null); 9 } 10 setState(c); 11 return free; 12 }
——————————————————————————————————————————————————————————————————————
C、循环阻塞线程:
在线程锁释放完毕后,就会调用LockSupport.park(this)方法将当前线程挂起,等待被signal()方法唤醒。
但在挂起线程前为啥要需要调用isOnSyncQueue()来确保这个不在同步队列中呢,之前不是说了同步队列和条件队列在数据结构上述无关的嘛。为啥会出现在同步队列的情况,这是因为可能其它线程获取到了锁,导致并发调用了signal(下文说明signal会详述)。
1 final boolean isOnSyncQueue(Node node) { 2 // 当前线程状态为CONDITION或前驱节点为空(条件队列的prev、next都是空)则不为同步队列 3 if (node.waitStatus == Node.CONDITION || node.prev == null) 4 return false; 5 // 若node的后继节点非空,则肯定为同步队列的节点 6 if (node.next != null) // If has successor, it must be on queue 7 return true; 8 // 从尾部向前搜索 9 return findNodeFromTail(node); 10 }
1 private boolean findNodeFromTail(Node node) { 2 // 获取尾部节点tail,只有在同步队列中tail才非空,条件队列的队尾是lastWaiter节点 3 Node t = tail; 4 for (;;) { 5 if (t == node) 6 return true; 7 if (t == null) 8 return false; 9 // 从尾部向前遍历队列 10 t = t.prev; 11 } 12 }
——————————————————————————————————————————————————————————————————————
3、signal():
唤醒方法分为signal(),唤醒单个线程;和signalAll(),唤醒多个线程。因signal和signalAll非常相似,所以我先说说signalAll()。
signalAll():
首先在看signalAll源码之前,我们先要知道调用signalAll方法的线程和signAll方法要唤醒的线程(在条件队列里等待的线程)的区别:
- 调用signalAll方法的线程本身已经持有锁了,现在准备去释放锁。
- signalAll方法要唤醒的线程是是条件队列里挂起了,等待被signal唤醒,然后去竞争锁。
——————————————————————————————————————————————————————————————————————
然后我们来看看signalAll的源码:
1 public final void signalAll() { 2 if (!isHeldExclusively()) 3 throw new IllegalMonitorStateException(); 4 Node first = firstWaiter; 5 if (first != null) 6 doSignalAll(first); 7 }
1、首先走了isHeldExclusively()逻辑,如果不满足则抛出监视器锁状态异常,我们通过查阅其源码可发现isHeldExclusively()内部的实现是直接抛异常的,也就是说这个方法肯定是交给子类实现的。
1 protected boolean isHeldExclusively() { 2 throw new UnsupportedOperationException(); 3 }
那我们来看看其子类ReentrantLock,可以发现它回去拿当前线程与当前持有锁的线程做比较。
为什么会这样比较呢,我们之前不是说到了嘛,Condition其实就是Object中wait和notify方法的扩展。
所以调用signalAll的肯定也是需要获取到锁的,哈哈。
1 protected final boolean isHeldExclusively() { 2 return getExclusiveOwnerThread() == Thread.currentThread(); 3 }
——————————————————————————————————————————————————————————————————————
2、然后我们将队头的节点交给first,并在其非空的情况下(也就是条件队列有数据存在)调用doSignalAll()。
1 private void doSignalAll(Node first) { 2 lastWaiter = firstWaiter = null; 3 do { 4 Node next = first.nextWaiter; 5 first.nextWaiter = null; 6 transferForSignal(first); 7 first = next; 8 } while (first != null); 9 }
首先将队首队尾的指针都指向空,也就是清空条件队列(因为是signalAll嘛,唤醒所有,当然是情况队列咯)。
紧接着遍历整个条件队列,因为first之前的指针是指向firstWaiter的,所以是队头节点,它能够遍历整个队列。
最后会依次调用transferForSignal,一个个添加到同步队列的队尾。
1 final boolean transferForSignal(Node node) { 2 // 先通过CAS将node节点的状态由CONDITION置为0,若修改失败说明该节点已经被CONDITION了,则直接返回操作下一个节点 3 // 如果操作成功,说明已经将该节点从条件队列唤醒了,因此会被添加到条件队列的尾队等待竞争锁 4 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 5 return false; 6 7 // 自旋的添加到同步队列队尾,并拿到当前节点的前驱节点 8 Node p = enq(node); 9 int ws = p.waitStatus; 10 // 将当前节点的前驱节点的状态置为SIGNAL,也就是要前驱节点来唤醒当前节点 11 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 12 LockSupport.unpark(node.thread); 13 return true; 14 }
——————————————————————————————————————————————————————————————————————
在说signal方法前我们先来总结下signalAll:
- 将条件队列清空(只是令lastWaiter = frstWaliter = null,队列中的结点和连接关系仍然还存在)。
- 将条件队列中的头结点取出,使之成为孤立结点(nextWaiter的prev和next属性都为null)。
- 如果该结点处于被Cancelled了的状态,则直接跳过该结点(由于是孤立结点,则会被GC回收)。
- 如果该结点处于正常状态,则通过enq()法将 它添加到同步队列的末尾。
- 判断是否需要将该结点唤醒(包括设置该结点的前驱结点的状态为SIGNAL),如有必要,直接唤醒该结点。
- 重复2-5,直到整个条件队列中的结点都被处理完。
——————————————————————————————————————————————————————————————————————
signal():
弄懂了signalAll后signal应该会很轻松,因为它们大同小异,signal只是唤醒队列中第一个没有被Cancelled的线程,如果没有则将条件队列清空。
1 public final void signal() { 2 if (!isHeldExclusively()) 3 throw new IllegalMonitorStateException(); 4 Node first = firstWaiter; 5 if (first != null) 6 doSignal(first); 7 }
1 private void doSignal(Node first) { 2 do { 3 if ( (firstWaiter = first.nextWaiter) == null) 4 lastWaiter = null; 5 first.nextWaiter = null; 6 } while (!transferForSignal(first) && 7 (first = firstWaiter) != null); 8 }