从零开始学并发四.CountDownLatch

CountDownLatch

CountDownLatch

	CountDownLatch在我们日常开发是经常使用到的。它作为线程间通讯一个重要的桥梁,能够使线程感知到其
	它线程是否达到自己执行的节点。
	作为一个同步计数器来使用。
	作为一个经常使用的工具类,所以明白它的内部原理是很重要的,接下来大家和我一起来慢慢探索把

CountDownLatch(int count)

	// 这是CountDownLatch的初始化方法,必须要传一个count参数,且count不能小于0.
   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    // 设置同步状态值
	 Sync(int count) {
            setState(count);
        }

在CountDownLatch的初始化方法中 引入了一个Sync类,如果看过我上一篇文章的ReentrantLock的同学,肯定觉得Sync类很熟悉。
其实和大家想的一样,CountDownLatch也是通过继承AbstractQueuedSynchronizer类来实现同步逻辑。

await()

  public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

acquireSharedInterruptibly(int arg)

  public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // 中断抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
            // 这里注意tryAcquireShared(arg) 只会返回1或-1
            //如果返回1,说明需要执行的节点已经全部执行完了,
            // 此时在调用await() 的方法不需要入队等待,直接往下走就行了
        if (tryAcquireShared(arg) < 0)
        	//返回-1, 说明还有节点在执行,这样的话,调用await()方法的线程应该要入队等待。
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared(int acquires)

 protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

doAcquireSharedInterruptibly(arg)

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 增加一个共享节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            	//拿到节点的前一个节点,为null时直接抛错
                final Node p = node.predecessor();
                // 如果前节点为head节点
                if (p == head) {
                	//判断同步状态是不是为0,即可以释放锁的状态r>=0 即是可以释放锁的状态
                    int r = tryAcquireShared(arg);
                    //
                    if (r >= 0) {
                    	
                        setHeadAndPropagate(node, r);
                        // 删除头节点
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 第一次循环:取消失效节点,并把前驱节点状态改为-1
                // 第二次循环,park线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate(Node node, int propagate)

private void setHeadAndPropagate(Node node, int propagate) {
		// 
        Node h = head; // Record old head for check below
        // 把当前节点设置为头节点
        setHead(node);
         // 此时进来的propagate等于1,所以该条件恒成立
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            //判断节点的下一个是不是共享节点
            Node s = node.next;
            // 是的话继续释放
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

countDown()

public void countDown() {
        sync.releaseShared(1);
    }

releaseShared(int arg)

   public final boolean releaseShared(int arg) {
   		// 当tryReleaseShared为true的时候->就是释放的同步状态为最后一个的时候
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared(arg)

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // for循环是为了预防多线程竞争,导致state频繁变化
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                	// 确认同步状态值是否有变化
                if (compareAndSetState(c, nextc))
                	// 只有state==1且没有其他线程更改state时
                    return nextc == 0;
            }
        }

doReleaseShared()

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 这个判断条件意思是,只有队列中有两个节点或以上才会走下面逻辑
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 如果此时head的ws==Node.SIGNAL,说明后续节点时等待唤醒状态
                // 
                if (ws == Node.SIGNAL) {
                	// 通过CAS 去释放锁,首先要保证自身节点没改变
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    	//如果释放失败跳过循环,重新走循环逻辑(说明此时得h节点状态已经改变)
                        continue;            // loop to recheck cases
                    // 释放可用的下一个节点
                    unparkSuccessor(h);
                }
               // 这一块是为独占和共享都存在的时候准备的,等讲到读写锁再来分析
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 只有在head节点没变化的时候才退出循环
            if (h == head)                   // loop if head changed
                break;
        }
    }

总结

来分析一下,在调用countDown() 的时候同步状态值state减1,等触发到临界状态值state==0 的时候,
会调用doReleaseShared()。此时会唤醒在队列中等待唤醒的队列。
大概逻辑图示:
从零开始学并发四.CountDownLatch
分析完整个流程后,我们来分析下多线程等待的时候会怎么样。

假设有7个线程调用await()等待。

那此时队列中此时队列情况为如下图示:

从零开始学并发四.CountDownLatch

当调用countDown()使得state==0

程序调用 doReleaseShared()

按上述队列一样,所有线程入队,并全部park,doReleaseShared()调用unparkSuccessor(h)唤醒T1线程,doReleaseShared()并在等待unparkSuccessor(h)执行完成
T1从park住得地方唤醒,走循环逻辑

  final Node p = node.predecessor();
             	//此时前驱节点为head
                if (p == head) {
                	//此时state==0  所以r==1
                    int r = tryAcquireShared(arg);
                    //
                    if (r >= 0) {
                    	// 这里看前面得代码解析,
                    	// 大体是将当前对象设置为head,并继续调用doReleaseShared()
                        setHeadAndPropagate(node, r);
                        // 删除头节点
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }

可以看到第二次调用doReleaseShared(),此时得头节点是T1线程,T1线程继续唤醒T2线程,然后T2走T1同样得逻辑去唤醒T3线程。
到T3线程来调用doReleaseShared(),此时head是T3线程,tail也是T3线程。此时T3不需要唤醒其他线程,直接返回。
到此所有等待中得线程全部唤醒。

到这里就把countDownLatch流程和源码讲完了,其实流程不负责,API 也不多,看完之前得ReentrantLock再来看这个就简单很多了

上一篇:C语言实现Printf函数功能,并输出字符串至char数组


下一篇:js 金额加上单位