简介
实现了ReadWriteLock接口,ReadLock,WriteLock实现了Lock接口规范;
ReadLock共享锁,实现了AQS的tryAcquire、tryRelease;
WriteLock独占锁,实现了AQS的tryAcquireShare、tryReleaseShare;
基本组成
-
Sync
- HoldCounter
- ThreadLocalHoldCounter
-
FairSync
-
NonfairSync
-
ReadLock
-
WriteLock
方法
- exclusiveCount
static final int SHARED_SHIFT = 16;
// 10000000000000000 - 1 = 01111111111111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**
* 返回共享状态下锁的数量
* c >>> SHARED_SHIFT :
* **************** **************** 将后16位移走,保留c的前16位
*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/**
* 返回独占状态下锁的数量
*
* 1与任何数进行&操作,都等于任何数本身,0与任何数&操作都等于0
* 参数c为32位 : **************** ****************
* EXCLUSIVE_MASK: 0000000000000000 1111111111111111
* 所以c跟EXCLUSIVE_MASK & 操作的结果是:c的后16位
*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码
- tryAcquire 写锁获取资源
/**
* Sync#tryAcquire
* 资源已被获取:
* 读锁存在,返回false
* 获取到资源的不是当前线程,返回false
* 超过count最大值,返回false
* 资源未被获取:
* 写阻塞,返回false
* CAS设置状态失败,返回false
*/
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 获取写锁的状态
int w = exclusiveCount(c);
// 资源已被获取
if (c != 0) {
// 资源被获取并且写锁数量是0,认为是有 读锁 存在
// 读锁存在(读写互斥) 或 当前线程不占有锁,资源获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 是否超过最大count限制(65535)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 更新状态
setState(c + acquires);
return true;
}
// 如果需要写阻塞 或 CAS设置状态失败,资源获取失败
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 标记当前线程占有资源
setExclusiveOwnerThread(current);
return true;
}
复制代码
- writerShouldBlock
/**
* FairSync#writerShouldBlock
*/
final boolean writerShouldBlock() {
// 调用AQS查看是否有前一个节点
return hasQueuedPredecessors();
}
/**
* NonfairSync#writerShouldBlock
*/
final boolean writerShouldBlock() {
// 永远不会阻塞
return false;
}
复制代码
- tryRelease 写锁释放资源
/**
* Sync#tryRelease
*/
protected final boolean tryRelease(int releases) {
// 获取资源的线程是否与当前线程一致
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 写锁数量是否为0,为0代表可以释放资源,free为true
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
复制代码
-
tryAcquireShared 读锁获取资源
/*
* Sync#tryAcquireShared
*
*/
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
复制代码
- fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
复制代码
- tryReleaseShare 读锁释放资源
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
复制代码
- readerShouldBlock
问题
- HolCounter中使用线程id,而不是引用,是为了防止影响垃圾回收?
- ReadLock、WriteLock、或者是ReentrantLock 为什么不继承AQS,而是要Sync去继承能?
为了让类更好维护吗?每个类的指责更明显?