ReenTrantLock (公平锁)Lock过程源码剖析

ReenTrantLock (公平锁)Lock过程源码剖析

背景:

在JDK1.6之前,sync还是一把不会改变的重量锁,Doug Lea 实现了ReenTrantLock。在现在sync更新后,ReenTrantLock凭借自己特有的API,依旧被广泛使用。此文章,是对ReenTrantLock中的lock方法的简要剖析。其中列举了五种常见情况。

情况一:第一个进入的线程 t1

  1. 进入lock

  2. 进入acquire(1);

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&    //tryAcquire尝试获取
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 进入tryAcquire
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();//获取当前线程lock的次数,为0。
        if (c == 0) {
            if (!hasQueuedPredecessors() &&//hasQueuedPredecessors:判断是否有人排队
                compareAndSetState(0, acquires)) { //CAS:比较和设置值状态,尝试加锁,成功返回true,失败返回false。底层为native操作。
                setExclusiveOwnerThread(current);//设置当前执行线程,为复制操作。
                return true;  
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
  1. 进入hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // 获取LinkedList头尾节点,此时没有链表即为null
    Node h = head;
    Node s;
    return h != t && // 此时即h与t同为null,故为false,整个函数返回false
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. CAS为native操作,尝试加锁成功返回true。最后设置当前执行线程后,返回true

  2. 此时!tryAcquire的值为false,故不会进入后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)操作,整个函数返回

综上,当第一次执行lock方法时,并未调用unsafe的park()方法。故该情况下的ReenTrantLock是轻量锁。

情况二:当t1正在执行时,再次进入lock()

  1. 进入lock()

  2. 进入acquire(1)中

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 进入tryAcquire(arg)中,尝试获取线程
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread(); // 获取当前线程t1.
        int c = getState(); //获取当前lock次数,此时为1
        if (c == 0) {// 进入else
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 判断当前线程是否是执行线程,true,进入if。
            int nextc = c + acquires; // acquires值为传参1,c为1,故nextc为2。
            if (nextc < 0) //nextc = 2 ,故不进入if
                throw new Error("Maximum lock count exceeded");
            setState(nextc);// 设置当前lock的次数为2
            return true; // 返回true
        }
        return false;
    }
} 
  1. 此时if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))中的!tryAcquire(arg)为false,故不进行之后的判断,t1继续执行。

综上,在t1执行时,t1再次进入lock(),并未调用unsafe的方法。故ReenTrantLock依旧是一把轻量锁。而State的设置也表明了ReenTrantLock的可重入性。

情况三:当t1正在执行时,t2尝试获取Lock

  1. t2 进入lock()

  2. 进入acquire(1);

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 进入tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
  1. 进入hasQueuedPredecessors(),因为此时LinkedList还没有创建,故返回false。而整个!hasQueuedPredecessors()为true。故进入compareAndSetState(0, acquires)中。该操作为native操作。尝试加锁,不能加锁,返回false。

  2. 因为if判断未false,故不会执行setExclusiveOwnerThread(current);。并返回false。

  3. 此时第一次尝试获取锁失败。故!tryAcquire(arg)值为true,进入后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中。

  4. 先进入addWaiter(Node.EXCLUSIVE), arg)中

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //新建node节点,并把当前线程传入,而后继
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail; // 此时,给创建新节点pre,并把List尾部(null)赋给tail。
    if (pred != null) { //此时pre为null,故不会进入if中。
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
  1. 进入enq(node)中,将新结点作为传参传入。
    第一次循环:
private Node enq(final Node node) {
    for (;;) {
        Node t = tail; // 创建新节点,并复制为List尾部,即为null
        if (t == null) { // 进入if判断体。
            if (compareAndSetHead(new Node()))// 创造空节点,并设置为List头部
                tail = head;// 尾部和头部指向空节点。
                //此时第一次循环结束
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第二次循环:

for (;;) {
        Node t = tail; 
        if (t == null) { //此时tail为头结点,即不为null,进入else。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t; //新结点的前驱指向头结点
            if (compareAndSetTail(t, node)) { // 设置尾部
                t.next = node;// 将尾部设置为node节点
                return t; //返回t,但没有接收。即为新结点前面一个节点。此时为null节点。
            }
        }
    }
  1. 进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))中。

    第一次循环: 
    
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); //创建p为node的前继。
            if (p == head && tryAcquire(arg)) { //此时p为头结点也就是head,但尝试获取失败,故返回false,不进入if中。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && // 判断是否应该park线程
                parkAndCheckInterrupt())// park线程
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 进入shouldParkAfterFailedAcquire(p, node)中,并将node与node前继传入。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //获取前继的状态,此时为-1。
    if (ws == Node.SIGNAL) 
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) { //进入else
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);。
    }
    return false;
}
  1. 返回true,并进入parkAndCheckInterrupt()中。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
  1. 调用Unsafe的park,停止线程。

综上,在有t1获取lock(),并未释放lock时,t2加入,调用了unsafe的park方法,故该情况下ReenTrantLock是一把重量锁。

情况四:t1未执行完成,List中还有t2,此时新线程t3.star()

接上,t3进入队列:

  1. 进入lock();

  2. 进入acquire(1)中。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 进入tryAcquire(arg)中,尝试获取lock。
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 获取当前线程lock的次数,为0。
        if (c == 0) {// 进入if
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
  1. 进入hasQueuedPredecessors()中,查看队列是否有有排队的线程。
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // 此时尾部指向t2,故t = t2(node)
    Node h = head; // 此时头部指向null,故h = null(node)
    Node s;
    return h != t && // t2 != null,故进入后面的判断。
        ((s = h.next) == null || s.thread != Thread.currentThread());
        // s = t2,t2不等于null,故成立。函数返回true。
}
  1. 在if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires))中,因为!hasQueuedPredecessors() 显示false,故不进行后面的判断以及执行if内部方法。并返回false。

  2. 回到原函数:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 此时!tryAcquire(arg)为true,进行后面的判断。
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 进入addWaiter(Node.EXCLUSIVE)中。
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //为t3创建新node节点。
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;// 新节点pred 为List尾部,即pred = t2(node)
    if (pred != null) {// t2不等于null,进入if内部。
        node.prev = pred; // 设置t3的前继为t2.
        if (compareAndSetTail(pred, node)) { // CAS
            pred.next = node; // 设置t2的后继为t3.
            return node; // 返回t3(node)
        }
    }
    enq(node);
    return node;
} 
  1. 进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中。

第一次循环:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); // p = t2
            if (p == head && tryAcquire(arg)) { // p != head,故不进入if。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 进入shouldParkAfterFailedAcquire(p, node)中,修改前继状态。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //获取前继状态,此时t2状态未改变,为0。
    if (ws == Node.SIGNAL)// SIGNAL为-1,故不进入if。
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) { // 进入else
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // CAS,设置t2状态为-1.
    }
    return false; // 返回false
}
  1. 在if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())中,shouldParkAfterFailedAcquire(p, node)返回false,不进入if,再次循环。

  2. 第二次循环:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); // p = t2
            if (p == head && tryAcquire(arg)) { // p != head,故不进入if。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 进入shouldParkAfterFailedAcquire(p, node)中,进行判断。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 第一次循环修改了t2(node)的state的状态为-1。
    if (ws == Node.SIGNAL) //ws = -1。进入if中。
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;// 返回true
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  1. 在if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())中,shouldParkAfterFailedAcquire(p, node)返回true,故进入后面的 parkAndCheckInterrupt()中,调用unsafe的park方法停止线程。

综上,在t1执行时,t2在队列排队,t3执行lock()时,调用了unsafe的park()方法。故此时的ReenTrantLock是一把重量锁。

情况五:此时t1,t2,t3都已执行完成,AQS内无待执行线程t4.lock()

  1. 进入lock();

  2. 进入acquire(1);中。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 进入tryAcquire(arg)中,尝试获取lock。
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // c = 0;
        if (c == 0) { // 进入if中
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

  1. 进入hasQueuedPredecessors()中,判断List状态。
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // t = 空节点
    Node h = head; // h = 空节点
    Node s;
    return h != t && // t等于h,故为false,返回false。
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. 在if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires))中,!hasQueuedPredecessors()显示为true。故执行后面的compareAndSetState(0, acquires),尝试获取锁,并获取成功返回true。进入后面的setExclusiveOwnerThread(current);,设置当前执行线程。并返回true。

  2. 此时if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))中,!tryAcquire(arg) 显示为false。故不会进入后面的判断,并返回函数。并开始执行t4。

综上,在AQS内的所有线程执行完毕后,t4进入lock(),并未调用unsafe的park方法,故此时的ReenTrantLock依旧是一把轻量锁。

上一篇:Qt重定向


下一篇:进程模块