public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 尝试获取共享锁 如果state=0,则可以获取到锁,流程往下进行
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* @param count 线程可以停止阻塞await往下执行countDown需要被执行的次数
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 使得当前线程等待直到latch为0,除非线程被打断或者等待时间
*
* 如果当前count为0那么await直接返回true
* 如果在count为0之前就超时了返回false
* 如果进入方法前就已经设置了中断态或者等待的时候被中断,抛出异常
*
* count不为0则线程停止调度,直到下面三件事发生
* 1)调用countDown方法使得count=0
* 2)其他线程中断了当前线程
* 3)等待超时
*/
public void await() throws InterruptedException {
// 要等待吗
sync.acquireSharedInterruptibly(1);
}
/**
* 使得当前线程等待直到latch为0,除非线程被打断或者等待时间超时
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public abstract class AbstractQueuedSynchronizer {
// 获取共享锁
public final void acquireShared(int arg) {
// 子类实现
// 返回负数代表失败
// 0代表成功,但后继争用线程不会成功
// 整数代表都可以成功
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 一旦共享获取成功,设置新的头结点,并且唤醒后继线程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 在获取共享锁成功后,设置head节点
* 根据调用的tryAcquireShared返回的状态以及节点本身的等待状态来判断是否需要唤醒后继线程
*
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 获取老的head
Node h = head;
// head移动到当前节点,清空线程和prev
setHead(node);
// propagate是tryAcquireShared返回值,决定是否传播唤醒的依据之一
// 1)propagate>0,代表前面的节点获取到了锁且释放了
// 2)h = null 没有竞争
// 3)如果ws = -1 || ws = -2 || ws = -3 都往后传播
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 查看还有没有后置节点,如果没有或者有共享的后置节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 如果队列中存在后继线程,唤醒
// 多线程同时释放共享锁,处于中间过程,可能会读到head节点ws=0的状态
// 此时虽然不能unpark,但是为了保证唤醒能够正确传递,设置ws=PROPAGATE
if (h != null && h != tail) {
// 获取头节点状态
int ws = h.waitStatus;
// 如果是等待通知的状态,复位,并唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果是初始化状态,则设置头节点状态为传递
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 检查还是否是head,不是则继续循环
if (h == head) // loop if head changed
break;
}
}
}
// 先看没有设置超时的等待
// 以可中断模式获取锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 共享锁,添加到队列中,插到队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取前置节点
final Node p = node.predecessor();
// 判断当前节点是否为头节点的后置节点
if (p == head) {
// 如果是,代表当前线程可以获取锁;
// 先拿status的值,如果为0,返回1,则可以获锁,否则往后执行
// countDownLatch入参arg传1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取到了,代表线程不阻塞了
// 将head设置为当前节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果前面有在排队的其他线程,根据前驱节点的ws判断是否要park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 没有获取到锁,是否要park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的ws
int ws = pred.waitStatus;
// 如果是前驱节点设置状态为SINGNAL,直接返回true,安心park
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 删除掉取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱节点是0或者PROPAGATE,代表需要一个通知,暂时不park,
// 设置ws为SIGNAL
// ws的SIGNAL代表是需要通知下个节点,代表该节点也在等待呢
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
// 被中断了就抛出异常,清空中断状态
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,失败了再尝试获取/有超时限制
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 超时的系统时间
final long deadline = System.nanoTime() + nanosTimeout;
// 共享锁,添加到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 死循环
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是head,意味着当前线程可以获取锁
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置当前节点为head
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}