【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

前言

ReentrantLock是非常常用的锁,在前面【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue在我们了解到,LinkedBlockingQueue入队、出队都是依赖ReentrantLock进行锁同步和线程唤醒、等待的。
本文来学习下ReentrantLock。

ReentrantLock

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

通过构造函数,我们可以看到可以根据参数fair,生成公平的同步和不公平的同步模式。
接下来需要看下FairSync和NonfairSync到底是何方神圣

Sync

【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock
Sync是一个抽象类,FairSync和NonfairSync都继承自Sync并实现了tryAcquire方法,tryAcquire是在AbstractQueuedSynchronizer(AQS)中声明的。
AbstractQueuedSynchronizer中的方法非常多,我们通过ReentrantLock中各方法的调用来逐步熟悉它。

ReentrantLock::lock

public void lock() {
    sync.acquire(1);
}

请求锁,如果加锁失败则一直等待。
ReentrantLock中加锁的方法非常简洁,直接调用sync的acquire方法
下面我们看下acquire的具体实现。

AbstractQueuedSynchronizer::acquire

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            //会一直等待,直到获取到锁为止
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
  • 首先尝试获取锁(具体实现下面分析),获取成功函数结束
  • 获取失败,则加入等待队列一直自旋尝试获取锁直到获取成功或超时。
  • 如果获取失败,则抛出中断异常

NonfairSync::tryAcquire

/**
 * Sync object for non-fair locks
 */
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果state = 0 即当前没加锁,则尝试通过CAS的方式加锁,加锁后将持有锁的线程设置为当前线程
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  • 如果没锁,则尝试获取锁
  • 如果有锁,判断是否是当前线程持有的
  • 是当前线程持有,则state值加1 返回加锁成功。即重入锁
  • 不是当前线程持有,则加锁失败

FairSync::tryAcquire

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果没锁
    if (c == 0) {
        //如果队列中没有比当前线程等待更久的线程,则尝试通过CAS的方式获取锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

/**
 * Queries whether any threads have been waiting to acquire longer
 * than the current thread.
 *
 * <p>An invocation of this method is equivalent to (but may be
 * more efficient than):
 * <pre> {@code
 * getFirstQueuedThread() != Thread.currentThread()
 *   && hasQueuedThreads()}</pre>
 *
 * <p>Note that because cancellations due to interrupts and
 * timeouts may occur at any time, a {@code true} return does not
 * guarantee that some other thread will acquire before the current
 * thread.  Likewise, it is possible for another thread to win a
 * race to enqueue after this method has returned {@code false},
 * due to the queue being empty.
 *
 * <p>This method is designed to be used by a fair synchronizer to
 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
 * Such a synchronizer's {@link #tryAcquire} method should return
 * {@code false}, and its {@link #tryAcquireShared} method should
 * return a negative value, if this method returns {@code true}
 * (unless this is a reentrant acquire).  For example, the {@code
 * tryAcquire} method for a fair, reentrant, exclusive mode
 * synchronizer might look like this:
 *
 * <pre> {@code
 * protected boolean tryAcquire(int arg) {
 *   if (isHeldExclusively()) {
 *     // A reentrant acquire; increment hold count
 *     return true;
 *   } else if (hasQueuedPredecessors()) {
 *     return false;
 *   } else {
 *     // try to acquire normally
 *   }
 * }}</pre>
 *
 * @return {@code true} if there is a queued thread preceding the
 *         current thread, and {@code false} if the current thread
 *         is at the head of the queue or the queue is empty
 * @since 1.7
 */
public final boolean hasQueuedPredecessors() {
    Node h, s;
    //如果队列不为空
    if ((h = head) != null) {
        //看当前线程是不是在队列的队首,即排队时间最长的队列
        if ((s = h.next) == null || s.waitStatus > 0) {
            s = null; // traverse in case of concurrent cancellation
            //这里为啥从队列尾部开始向前遍历???可能是因为队列头部可能会有大量超时的节点,从后往前遍历更快?
            for (Node p = tail; p != h && p != null; p = p.prev) {
                if (p.waitStatus <= 0)
                    s = p;
            }
        }
        if (s != null && s.thread != Thread.currentThread())
            return true;
    }
    return false;
}

ReentrantLock::lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

/**
 * Acquires in exclusive mode, aborting if interrupted.
 * Implemented by first checking interrupt status, then invoking
 * at least once {@link #tryAcquire}, returning on
 * success.  Otherwise the thread is queued, possibly repeatedly
 * blocking and unblocking, invoking {@link #tryAcquire}
 * until success or the thread is interrupted.  This method can be
 * used to implement method {@link Lock#lockInterruptibly}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @throws InterruptedException if the current thread is interrupted
 */
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    //如果线程被重点,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果获取锁失败 
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

/**
 * Acquires in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //new一个新节点
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        //轮询直到获取到锁或者线程被中断
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断

ReentrantLock::tryLock

public boolean tryLock() {
    //尝试获取锁,获取失败的话 直接返回false,不会再等待
    return sync.nonfairTryAcquire(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    //尝试获取锁,如果失败的话,等待timeout时间后返回false,如果被中断则抛出异常
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

请求锁,如果请求失败,则返回false

ReentrantLock::unlock

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //释放锁 state数减releases
    int c = getState() - releases;
    //如果当前线程没有持有锁,则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //当c=0时,锁完全释放,ownerThread设为null。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

释放锁,直到state=0完全释放时,线程owner设置为null

ReentrantLock::newCondition

public Condition newCondition() {
    return sync.newCondition();
}

final ConditionObject newCondition() {
    return new ConditionObject();
}

ConditionObject::await

线程释放锁,阻塞挂起,直到被signal唤醒,则继续尝试获取锁

public final void await() throws InterruptedException {
    //如果当前线程被中断、则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //新创建个节点,将当前线程加入等待队列
    Node node = addConditionWaiter();
    //完全释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //阻塞直到node被唤醒
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //如果被中断,则直接break
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

ConditionObject::awaitNanos

线程释放锁,阻塞挂起一段时间,直到被signal唤醒或超时,则继续尝试获取锁

public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // We don't check for nanosTimeout <= 0L here, to allow
    // awaitNanos(0) as a way to "yield the lock".
    final long deadline = System.nanoTime() + nanosTimeout;
    long initialNanos = nanosTimeout;
    //加入等待队列
    Node node = addConditionWaiter();
    //释放所有锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //等待超时
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    //尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    long remaining = deadline - System.nanoTime(); // avoid overflow
    return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}

ConditionObject::awaitUntil

线程释放锁,阻塞挂起一段时间,直到被signal唤醒或到指定时间,则继续尝试获取锁

public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (System.currentTimeMillis() >= abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

ConditionObject::signal

把首节点的status设置为Node.SIGNAL 则阻塞的线程循环判断发现statue状态变了,则唤醒继续执行。如果设置status失败,则在此线程中调用LockSupport.unpark唤醒阻塞的线程

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    //唤醒一个节点,把statue设置为Node.SIGNAL。如果设置失败了,则自己调用LockSupport.unpark唤醒线程
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

ConditionObject::signalAll

唤醒所有的节点。

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

总结

【从入门到放弃-Java】并发编程-锁-synchronized中,我们学习内置锁synchronized,与ReentrantLock对比

  • 两者都是可重入的互斥锁。
  • synchronized是隐式的加解锁,不需要手动解锁。而ReentrantLock需要显式的lock和unlock。lock加锁多少次,对应的就需要unlock多少次。因此一般都会在finally中unlock。避免因异常等情况导致锁无法释放
  • ReentrantLock通过AQS(volatile state + CAS + CLH队列实现)加解锁。synchronized是通过monitor实现(存在偏向锁、轻量级锁、重量级锁等锁升级)。
  • ReentrantLock可以使用lockInterruptibly响应中断,synchronized只能傻等、等到死
  • ReentrantLock可以使用非公平锁和公平锁模式,可以通过非公平性减少CAS的竞争,提升性能。也可以通过公平锁减少线程饥饿情况发生
  • ReentrantLock可以创造多个Condition,来实现线程等待通知机制(阻塞、唤醒)

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

上一篇:【从入门到放弃-Java】并发编程-JUC-SynchronousQueue


下一篇:【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue