CountDownLatch
CountDownLatch
CountDownLatch在我们日常开发是经常使用到的。它作为线程间通讯一个重要的桥梁,能够使线程感知到其
它线程是否达到自己执行的节点。
作为一个同步计数器来使用。
作为一个经常使用的工具类,所以明白它的内部原理是很重要的,接下来大家和我一起来慢慢探索把
CountDownLatch(int count)
// 这是CountDownLatch的初始化方法,必须要传一个count参数,且count不能小于0.
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 设置同步状态值
Sync(int count) {
setState(count);
}
在CountDownLatch的初始化方法中 引入了一个Sync类,如果看过我上一篇文章的ReentrantLock的同学,肯定觉得Sync类很熟悉。
其实和大家想的一样,CountDownLatch也是通过继承AbstractQueuedSynchronizer类来实现同步逻辑。
await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly(int arg)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 中断抛异常
if (Thread.interrupted())
throw new InterruptedException();
// 这里注意tryAcquireShared(arg) 只会返回1或-1
//如果返回1,说明需要执行的节点已经全部执行完了,
// 此时在调用await() 的方法不需要入队等待,直接往下走就行了
if (tryAcquireShared(arg) < 0)
//返回-1, 说明还有节点在执行,这样的话,调用await()方法的线程应该要入队等待。
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared(int acquires)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly(arg)
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 增加一个共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//拿到节点的前一个节点,为null时直接抛错
final Node p = node.predecessor();
// 如果前节点为head节点
if (p == head) {
//判断同步状态是不是为0,即可以释放锁的状态r>=0 即是可以释放锁的状态
int r = tryAcquireShared(arg);
//
if (r >= 0) {
setHeadAndPropagate(node, r);
// 删除头节点
p.next = null; // help GC
failed = false;
return;
}
}
// 第一次循环:取消失效节点,并把前驱节点状态改为-1
// 第二次循环,park线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate(Node node, int propagate)
private void setHeadAndPropagate(Node node, int propagate) {
//
Node h = head; // Record old head for check below
// 把当前节点设置为头节点
setHead(node);
// 此时进来的propagate等于1,所以该条件恒成立
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//判断节点的下一个是不是共享节点
Node s = node.next;
// 是的话继续释放
if (s == null || s.isShared())
doReleaseShared();
}
}
countDown()
public void countDown() {
sync.releaseShared(1);
}
releaseShared(int arg)
public final boolean releaseShared(int arg) {
// 当tryReleaseShared为true的时候->就是释放的同步状态为最后一个的时候
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(arg)
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// for循环是为了预防多线程竞争,导致state频繁变化
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
// 确认同步状态值是否有变化
if (compareAndSetState(c, nextc))
// 只有state==1且没有其他线程更改state时
return nextc == 0;
}
}
doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
// 这个判断条件意思是,只有队列中有两个节点或以上才会走下面逻辑
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果此时head的ws==Node.SIGNAL,说明后续节点时等待唤醒状态
//
if (ws == Node.SIGNAL) {
// 通过CAS 去释放锁,首先要保证自身节点没改变
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
//如果释放失败跳过循环,重新走循环逻辑(说明此时得h节点状态已经改变)
continue; // loop to recheck cases
// 释放可用的下一个节点
unparkSuccessor(h);
}
// 这一块是为独占和共享都存在的时候准备的,等讲到读写锁再来分析
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 只有在head节点没变化的时候才退出循环
if (h == head) // loop if head changed
break;
}
}
总结
来分析一下,在调用countDown() 的时候同步状态值state减1,等触发到临界状态值state==0 的时候,
会调用doReleaseShared()。此时会唤醒在队列中等待唤醒的队列。
大概逻辑图示:
分析完整个流程后,我们来分析下多线程等待的时候会怎么样。
假设有7个线程调用await()等待。
那此时队列中此时队列情况为如下图示:
当调用countDown()使得state==0
程序调用 doReleaseShared()
按上述队列一样,所有线程入队,并全部park,doReleaseShared()调用unparkSuccessor(h)唤醒T1线程,doReleaseShared()并在等待unparkSuccessor(h)执行完成。
T1从park住得地方唤醒,走循环逻辑
final Node p = node.predecessor();
//此时前驱节点为head
if (p == head) {
//此时state==0 所以r==1
int r = tryAcquireShared(arg);
//
if (r >= 0) {
// 这里看前面得代码解析,
// 大体是将当前对象设置为head,并继续调用doReleaseShared()
setHeadAndPropagate(node, r);
// 删除头节点
p.next = null; // help GC
failed = false;
return;
}
}
可以看到第二次调用doReleaseShared(),此时得头节点是T1线程,T1线程继续唤醒T2线程,然后T2走T1同样得逻辑去唤醒T3线程。
到T3线程来调用doReleaseShared(),此时head是T3线程,tail也是T3线程。此时T3不需要唤醒其他线程,直接返回。
到此所有等待中得线程全部唤醒。
到这里就把countDownLatch流程和源码讲完了,其实流程不负责,API 也不多,看完之前得ReentrantLock再来看这个就简单很多了