CountDownLatch源码解析

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
        
        // 尝试获取共享锁 如果state=0,则可以获取到锁,流程往下进行
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * @param count 线程可以停止阻塞await往下执行countDown需要被执行的次数 
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * 使得当前线程等待直到latch为0,除非线程被打断或者等待时间
     * 
     * 如果当前count为0那么await直接返回true
     * 如果在count为0之前就超时了返回false
     * 如果进入方法前就已经设置了中断态或者等待的时候被中断,抛出异常
     * 
     * count不为0则线程停止调度,直到下面三件事发生
     * 1)调用countDown方法使得count=0
     * 2)其他线程中断了当前线程
     * 3)等待超时
     */
    public void await() throws InterruptedException {
        // 要等待吗
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 使得当前线程等待直到latch为0,除非线程被打断或者等待时间超时
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }
    
    public abstract class AbstractQueuedSynchronizer {
        // 获取共享锁 
        public final void acquireShared(int arg) {
            // 子类实现 
            // 返回负数代表失败
            // 0代表成功,但后继争用线程不会成功
            // 整数代表都可以成功
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
        
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        // 一旦共享获取成功,设置新的头结点,并且唤醒后继线程
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        
        /**
         * 在获取共享锁成功后,设置head节点
         * 根据调用的tryAcquireShared返回的状态以及节点本身的等待状态来判断是否需要唤醒后继线程
         *
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            // 获取老的head  
            Node h = head;
            // head移动到当前节点,清空线程和prev
            setHead(node);
            
            // propagate是tryAcquireShared返回值,决定是否传播唤醒的依据之一
            // 1)propagate>0,代表前面的节点获取到了锁且释放了
            // 2)h = null 没有竞争
            // 3)如果ws = -1 || ws = -2 || ws = -3 都往后传播
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                // 查看还有没有后置节点,如果没有或者有共享的后置节点
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
        
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                // 如果队列中存在后继线程,唤醒
                // 多线程同时释放共享锁,处于中间过程,可能会读到head节点ws=0的状态
                // 此时虽然不能unpark,但是为了保证唤醒能够正确传递,设置ws=PROPAGATE
                if (h != null && h != tail) {
                    // 获取头节点状态
                    int ws = h.waitStatus;
                    // 如果是等待通知的状态,复位,并唤醒后继节点
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;               // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    // 如果是初始化状态,则设置头节点状态为传递
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                   // loop on failed CAS
                    }
                    // 检查还是否是head,不是则继续循环
                    if (h == head)                   // loop if head changed
                        break;
                }
            }
        }
    
    
        // 先看没有设置超时的等待
        
        // 以可中断模式获取锁
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // 共享锁,添加到队列中,插到队尾
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    // 获取前置节点
                    final Node p = node.predecessor();
                    // 判断当前节点是否为头节点的后置节点
                    if (p == head) {
                        // 如果是,代表当前线程可以获取锁;
                        
                        // 先拿status的值,如果为0,返回1,则可以获锁,否则往后执行
                        // countDownLatch入参arg传1
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 获取到了,代表线程不阻塞了
                            // 将head设置为当前节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    // 如果前面有在排队的其他线程,根据前驱节点的ws判断是否要park
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
       
        
        // 没有获取到锁,是否要park
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获取前驱节点的ws
            int ws = pred.waitStatus;
            // 如果是前驱节点设置状态为SINGNAL,直接返回true,安心park
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                // 删除掉取消的节点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 如果前驱节点是0或者PROPAGATE,代表需要一个通知,暂时不park,
                // 设置ws为SIGNAL
                // ws的SIGNAL代表是需要通知下个节点,代表该节点也在等待呢
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        
        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
            // 被中断了就抛出异常,清空中断状态
            if (Thread.interrupted())
                throw new InterruptedException();
            // 尝试获取锁,失败了再尝试获取/有超时限制
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }
        
        private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
                
            // 超时的系统时间
            final long deadline = System.nanoTime() + nanosTimeout;
            // 共享锁,添加到队列中
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                // 死循环
                for (;;) {
                    // 获取当前节点的前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点是head,意味着当前线程可以获取锁
                    if (p == head) {
                        // 尝试获取锁
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 设置当前节点为head
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return true;
                        }
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        
       
上一篇:CountDownLatch demo与源码


下一篇:多线程高并发编程(5) -- CountDownLatch、CyclicBarrier源码分析