首先里面有些是aqs的源码,请参考https://www.cnblogs.com/johnzhao/p/15022326.html
1.先给个可以执行的例子
public class TestCondition { public static void main(String[] args) { //创建锁 Lock lock = new ReentrantLock(); //创建2个条件 Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); new Thread(()->{ try { System.out.println("开始执行线程1"); lock.lock(); condition1.await(); System.out.println("线程1执行结束"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }).start(); new Thread(()->{ try { System.out.println("开始执行线程2"); lock.lock(); condition2.await(); System.out.println("线程2执行结束"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }).start(); //主线程唤醒 try { Thread.sleep(2000); System.out.println("主线程开始"); lock.lock(); //唤醒线程2 condition2.signal(); Thread.sleep(1000); //唤醒线程1 condition1.signal(); } catch (Exception ex){} finally { lock.unlock(); System.out.println("主线程释放锁了"); } } }
2.在这个基础上,给个可以debug测试的例子
public class TestCondition { public static void main(String[] args) { //创建锁 Lock lock = new ReentrantLock(); //创建2个条件 Condition condition1 = lock.newCondition(); new Thread(()->{ try { System.out.println("开始执行线程1"); lock.lock(); condition1.await(); System.out.println("线程1执行结束"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }).start(); //主线程唤醒 try { Thread.sleep(2000); System.out.println("主线程开始"); lock.lock(); //唤醒线程2 Thread.sleep(1000); //唤醒线程1 condition1.signal(); } catch (Exception ex){} finally { lock.unlock(); System.out.println("主线程释放锁了"); } } }
3.我这边把condition和ReentrantLock以及aqs的关系模拟出来了
public class MyLock { public MyLock(){ sync = new NonfairSync(); } NonfairSync sync; static class NonfairSync extends aqs{ final ConditionObject newCondition() { return new ConditionObject(); } public String name = "zhangsan"; } Condition newCondition(){ return sync.newCondition(); } public static void main(String[] args) { MyLock outer = new MyLock(); Condition condition = outer.newCondition(); System.out.println(111); } } abstract class aqs{ Node head;//创建锁的时候自动拥有了这个属性,同步队列的头 Node tail;//同步队列的尾 public class ConditionObject implements Condition{ Node firstWaiter; //等待队列的头 Node lastWaiter; //等待队列的尾 } static final class Node{ } } interface Condition{ }
说明:
1)MyLock相当于ReentrantLock,里面就只放了一个默认的非公平锁
2)我们创建了MyLock的时候就等于有了sync,sync就等于有了父类aqs的默认属性head,tail
3)Condition是每次可以创建的,每个Condition都有自己的等待队列,这个是各种唤醒自己队列的基础,同时Condition是拥有sync对象的,也就是说多个Condition共享一个sync,也代表了
aqs的同步队列他们是共享的。在这个基础上Condition的操作才能保证线程间的唤醒和挂起不会混乱
4.我把第2点的例子简化下,说明下过程,过程的原理会在源码中说明
1)线程1lock 2)线程1 aawait 3)线程1 unlock 4)主线程lock 5)主线程signal 6)主线程unlock
执行的顺序是 1,2,4,5,6,3 也就是signal并不会立马唤醒,这点很重要,有些博客这点都写错了
1上锁 2解锁,同时挂起自己 然后4才有资格继续上锁 5.不会动锁 6,解锁,同时唤醒aqs同步队列,那么线程1挂起线程有资格被唤醒
5.源码解读
5.1 await方法 java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException();
//创建等待队列的节点 Node node = addConditionWaiter();
//full释放,status大于1,重入的情况也直接释放 int savedState = fullyRelease(node); int interruptMode = 0;
//判断是否可以挂起自己 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
//尝试自旋去获取锁,aqs中的源码 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
5.1.1 创建等待队列节点
private Node addConditionWaiter() {
//拿到队列尾节点 Node t = lastWaiter; // If lastWaiter is cancelled, clean out.看它的意思很明确咯 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }
//创建一个新节点,注意类型是waitStatus=Node.CONDITION 也就是-2 Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果没有尾节点,那么新创建的节点就是头节点和尾节点
//如果之前有尾节点了,那么加到后面去 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node;
//返回当前节点 return node; }
5.1.2 full释放锁
final int fullyRelease(Node node) { boolean failed = true; try {
//拿到当前的状态 int savedState = getState();
//根据这个状态释放,也就是锁持有的线程=null,同时唤醒其他节点,这个源码aqs中有,就不写了 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally {
//这里考虑点和aqs一样,出异常的时候取消节点 if (failed) node.waitStatus = Node.CANCELLED; } }
5.1.3判断是否可以挂起自己
final boolean isOnSyncQueue(Node node) {
//这方法会在挂起前执行,唤醒后也可能执行,因为外面是while循环,而后面的signal方法会修改node的waitStatus!=Node.CONDITION,也会把这个Node加到aqs同步队列中,这里参考后面源码解析
//所以挂起前这个条件是false,加上外面的!处理,导致结果就是可以挂起当前线程
//而siganl后这个条件都不满足了,走下面分支 if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
//唤醒后如果aqs中,它还有后续节点,那么返回true,也就是外面退出循环 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
//如果唤醒后,自己属于aqs队列的最后一个节点,那么一直往前找,直到找到这个节点就返回true,也就是外面退出循环。这里只是为了证明这个节点确实在aqs中,那么外面退出循环就可以尝试去获取锁
return findNodeFromTail(node); }
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
5.2 signal方法 java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
//return getExclusiveOwnerThread() == Thread.currentThread();这个对线程进程判断,因为执行到这里的时候,已经拥有锁了
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
//这里是拿等待队列的第一个节点
Node first = firstWaiter;
if (first != null)
//真正处理
doSignal(first);
}
5.2.1真正处理
private void doSignal(Node first) { do {
//如果头节点的下个节点是空,那么尾节点就不需要了,清空整个队列。否则的话,下个节点变成头节点,也就是自己出队列 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null;
//自己出队列的话,引用不要了,方便gc first.nextWaiter = null;
//真正处理地点,把当前节点加入到aqs同步队列中去 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
5.2.2把自己加入到同步队列
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */
//cas把自己的waitStatus改为默认值0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
//这步在aqs中有介绍,加入到aqs最后
Node p = enq(node); int ws = p.waitStatus;
//把自己的waistatus设置成-1 表示可以被唤醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
所以这里看得到,这个方法并没有唤醒线程,只是把当前节点丢在aqs最后,然后在后面的unlock后就会被唤醒,等于线程1await的地方代码会重新执行。