6.Condition的源码解读

首先里面有些是aqs的源码,请参考https://www.cnblogs.com/johnzhao/p/15022326.html

1.先给个可以执行的例子

public class TestCondition {

  public static void main(String[] args) {
    //创建锁
    Lock lock = new ReentrantLock();
    //创建2个条件
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();

    new Thread(()->{
      try {
        System.out.println("开始执行线程1");
        lock.lock();
        condition1.await();
        System.out.println("线程1执行结束");
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        lock.unlock();
      }
    }).start();


    new Thread(()->{
      try {
        System.out.println("开始执行线程2");
        lock.lock();
        condition2.await();
        System.out.println("线程2执行结束");
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        lock.unlock();
      }
    }).start();


    //主线程唤醒
    try {
      Thread.sleep(2000);
      System.out.println("主线程开始");
      lock.lock();
      //唤醒线程2
      condition2.signal();
      Thread.sleep(1000);

      //唤醒线程1
      condition1.signal();
    } catch (Exception ex){}
      finally {
      lock.unlock();
      System.out.println("主线程释放锁了");
    }
  }
}

2.在这个基础上,给个可以debug测试的例子

public class TestCondition {

  public static void main(String[] args) {
    //创建锁
    Lock lock = new ReentrantLock();
    //创建2个条件
    Condition condition1 = lock.newCondition();

    new Thread(()->{
      try {
        System.out.println("开始执行线程1");
        lock.lock();
        condition1.await();
        System.out.println("线程1执行结束");
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        lock.unlock();
      }
    }).start();

    //主线程唤醒
    try {
      Thread.sleep(2000);
      System.out.println("主线程开始");
      lock.lock();
      //唤醒线程2
      Thread.sleep(1000);

      //唤醒线程1
      condition1.signal();
    } catch (Exception ex){}
      finally {
      lock.unlock();
      System.out.println("主线程释放锁了");
    }
  }
}

3.我这边把condition和ReentrantLock以及aqs的关系模拟出来了

public class MyLock {

  public MyLock(){
    sync = new NonfairSync();
  }

  NonfairSync sync;

  static class NonfairSync extends aqs{
    final ConditionObject newCondition() {
      return new ConditionObject();
    }
    public String name = "zhangsan";
  }

  Condition newCondition(){
    return sync.newCondition();
  }

  public static void main(String[] args) {
    MyLock outer = new MyLock();
    Condition condition = outer.newCondition();
    System.out.println(111);
  }

}
abstract class aqs{
  Node head;//创建锁的时候自动拥有了这个属性,同步队列的头
  Node tail;//同步队列的尾
  public class ConditionObject implements Condition{
    Node firstWaiter; //等待队列的头
    Node lastWaiter;  //等待队列的尾
  }
  static final class Node{

  }
}
interface Condition{

}

说明:

1)MyLock相当于ReentrantLock,里面就只放了一个默认的非公平锁

2)我们创建了MyLock的时候就等于有了sync,sync就等于有了父类aqs的默认属性head,tail

3)Condition是每次可以创建的,每个Condition都有自己的等待队列,这个是各种唤醒自己队列的基础,同时Condition是拥有sync对象的,也就是说多个Condition共享一个sync,也代表了

aqs的同步队列他们是共享的。在这个基础上Condition的操作才能保证线程间的唤醒和挂起不会混乱

4.我把第2点的例子简化下,说明下过程,过程的原理会在源码中说明

1)线程1lock  2)线程1 aawait 3)线程1 unlock  4)主线程lock  5)主线程signal 6)主线程unlock

执行的顺序是  1,2,4,5,6,3  也就是signal并不会立马唤醒,这点很重要,有些博客这点都写错了

1上锁  2解锁,同时挂起自己  然后4才有资格继续上锁 5.不会动锁  6,解锁,同时唤醒aqs同步队列,那么线程1挂起线程有资格被唤醒

5.源码解读

5.1 await方法 java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
//创建等待队列的节点 Node node
= addConditionWaiter();
//full释放,status大于1,重入的情况也直接释放
int savedState = fullyRelease(node); int interruptMode = 0;
//判断是否可以挂起自己
while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
//尝试自旋去获取锁,aqs中的源码
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

5.1.1 创建等待队列节点

private Node addConditionWaiter() {
//拿到队列尾节点 Node t
= lastWaiter; // If lastWaiter is cancelled, clean out.看它的意思很明确咯 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }
//创建一个新节点,注意类型是waitStatus=Node.CONDITION 也就是-2 Node node
= new Node(Thread.currentThread(), Node.CONDITION);
//如果没有尾节点,那么新创建的节点就是头节点和尾节点
//如果之前有尾节点了,那么加到后面去
if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node;
//返回当前节点
return node; }

5.1.2 full释放锁

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
//拿到当前的状态
int savedState = getState();
//根据这个状态释放,也就是锁持有的线程=null,同时唤醒其他节点,这个源码aqs中有,就不写了
if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally {
//这里考虑点和aqs一样,出异常的时候取消节点
if (failed) node.waitStatus = Node.CANCELLED; } }

5.1.3判断是否可以挂起自己

final boolean isOnSyncQueue(Node node) {
//这方法会在挂起前执行,唤醒后也可能执行,因为外面是while循环,而后面的signal方法会修改node的waitStatus!=Node.CONDITION,也会把这个Node加到aqs同步队列中,这里参考后面源码解析
//所以挂起前这个条件是false,加上外面的!处理,导致结果就是可以挂起当前线程
//而siganl后这个条件都不满足了,走下面分支
if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
//唤醒后如果aqs中,它还有后续节点,那么返回true,也就是外面退出循环
if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
//如果唤醒后,自己属于aqs队列的最后一个节点,那么一直往前找,直到找到这个节点就返回true,也就是外面退出循环。这里只是为了证明这个节点确实在aqs中,那么外面退出循环就可以尝试去获取锁
return findNodeFromTail(node); }
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

5.2 signal方法 java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal

public final void signal() {
//
return getExclusiveOwnerThread() == Thread.currentThread();这个对线程进程判断,因为执行到这里的时候,已经拥有锁了
if (!isHeldExclusively()) throw new IllegalMonitorStateException(); 
//这里是拿等待队列的第一个节点
Node first
= firstWaiter;

if (first != null)
//真正处理
doSignal(first);
}

5.2.1真正处理

        private void doSignal(Node first) {
            do {
//如果头节点的下个节点是空,那么尾节点就不需要了,清空整个队列。否则的话,下个节点变成头节点,也就是自己出队列
if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null;
//自己出队列的话,引用不要了,方便gc first.nextWaiter
= null;
//真正处理地点,把当前节点加入到aqs同步队列中去 }
while (!transferForSignal(first) && (first = firstWaiter) != null); }

5.2.2把自己加入到同步队列

final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
//cas把自己的waitStatus改为默认值0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
//这步在aqs中有介绍,加入到aqs最后
Node p = enq(node); int ws = p.waitStatus;
//把自己的waistatus设置成-1 表示可以被唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

所以这里看得到,这个方法并没有唤醒线程,只是把当前节点丢在aqs最后,然后在后面的unlock后就会被唤醒,等于线程1await的地方代码会重新执行。

6.Condition的源码解读

上一篇:类型转换


下一篇:IO相关知识