【从入门到放弃-Java】并发编程-JUC-locks-ReentrantReadWriteLock

前言

上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。
接下来我们一起来学习下ReentrantReadWriteLock。

ReentrantReadWriteLock

【从入门到放弃-Java】并发编程-JUC-locks-ReentrantReadWriteLock

/**
 * Creates a new {@code ReentrantReadWriteLock} with
  * default (nonfair) ordering properties.
  */
 public ReentrantReadWriteLock() {
     this(false);
 }
 
 /**
  * Creates a new {@code ReentrantReadWriteLock} with
  * the given fairness policy.
  *
  * @param fair {@code true} if this lock should use a fair ordering policy
  */
 public ReentrantReadWriteLock(boolean fair) {
     sync = fair ? new FairSync() : new NonfairSync();
     readerLock = new ReadLock(this);
     writerLock = new WriteLock(this);
 }

我们可以看到和ReentrantLock一样,ReentrantReadWriteLock也使用了通过AQS实现的FairSync和NonfairSync模式
有两个成员变量锁ReadLock和WriteLock

ReadLock::lock

获取读锁,不死不休

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    //如果已经有写锁,且不是当前线程持有的,则加读锁失败
    //如果当前线程已经持有写锁,则可以获取读锁,这就是锁降级
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    /** 
     * 判断读线程是否阻塞,取决于队列的策略
     *   公平锁策略:如果当前同步队列不为空且当前线程不是队列的第一个节点,则阻塞。
     *   非公平锁策略:如果当前队列的第一个节点时写锁,则需要阻塞。这样是为了防止写锁饥饿。
     * 如果不需要阻塞,且读锁数未达到最大值 则尝试通过cas的方式获取锁
     */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //如果当前读锁为0,则当前线程获取锁
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //如过第一个读锁的持有者是当前线程,则firstReaderHoldCount数量加一
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            //如果最后一个获取锁的线程不是当前线程
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                //获取当前线程的锁
                cachedHoldCounter = rh = readHolds.get();
            //如果当前最后一个线程获取锁数量为0,则将其设置为当前线程的holdcounter
            else if (rh.count == 0)
                readHolds.set(rh);
            //读锁数+1
            rh.count++;
        }
        return 1;
    }
    //尝试无限循环获取读锁
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        //如果已经有写锁,且不是当前线程持有的,返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        //如果需要阻塞
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                        //如果当前线程持有的锁数为0,则移除
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取锁成功后,将当前线程从队列头结点移除
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

ReadLock::lockInterruptibly

获取读锁,直到成功或被中断

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //如果收到中断信号,则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果尝试获取锁失败,则循环等待获取锁
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            //获取锁失败的话则需要进行中断检测,检测到中断信号则抛出异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::tryLock

//尝试获取读锁,如果有写锁获取失败,则直接返回失败
public boolean tryLock() {
    return sync.tryReadLock();
}

@ReservedStackAccess
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

//尝试获取读锁,获取失败或者超时未获取到的话,则返回失败
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //排到当前线程的话则尝试获取锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            //超时返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            
            //阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            //如果被中断
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::unlock

释放锁

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个持有读锁的
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        //如果是唯一一个持有读锁的,则firstReader设置为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        //firstReaderHoldCount减一,
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        //如果不是最后一个持有读锁的线程
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            //从ThreadLocal获取readHolds
            rh = readHolds.get();
        int count = rh.count;
        //如果小于等于1,则移除readHolds
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        //持有锁的数量减一
        --rh.count;
    }
    for (;;) {
        //将state设置为0,原因是在写锁降级为读锁后,释放读锁时,需要将state设为0,方便后续的写锁竞争。
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        //如果头结点不是null,并且队列不为空
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果当前结点是SIGNAL信号
            if (ws == Node.SIGNAL) {
                //唤醒头结点
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

WriteLock::lock

获取写锁,如果获取失败,则加入等待队列
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

public void lock() {
    sync.acquire(1);
}

WriteLock::lockInterruptibly

获取写锁,如果获取失败,则加入等待队列,直到获取到或被中断
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

WriteLock::tryLock

public boolean tryLock() {
    return sync.tryWriteLock();
}

@ReservedStackAccess
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    //如果存在写锁,且写锁不是当前线程持有的,则返回false
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //如果不存在写锁或是当前线程获取的写锁,则尝试将state加一
    if (!compareAndSetState(c, c + 1))
        return false;
    //设置持有写锁的线程为当前线程
    setExclusiveOwnerThread(current);
    return true;
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    //和ReentrantLock的调用方法一样,不再赘述
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

WriteLock::unlock

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //如果不是当前线程持有的写锁,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //判断持有的写锁是否释放完毕
    boolean free = exclusiveCount(nextc) == 0;
    //如果释放完毕,则将当前持有锁的线程设置为null
    if (free)
        setExclusiveOwnerThread(null);
    //设置持有的锁数量减一
    setState(nextc);
    return free;
}

总结

通过源码分析,我们了解到,可以通过ReentrantReadWriteLock可以获取读锁和写锁。

  • 写锁是互斥锁,只能一个线程持有,写锁和ReentrantLock类似
  • 读锁是共享锁,可以多个线程同时持有。
  • 读锁通过firstReader和cachedHoldCounter优化获取、释放锁的性能。使用ThreadLocal readHolds存放所有持有锁线程的tid和持有锁数量。
  • 线程可以将自己持有的写锁降级为读锁,在释放读锁时,一起释放。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

上一篇:【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue


下一篇:【从入门到放弃-Java】并发编程-JUC-ConcurrentHashMap