CountDownlatch(等待多线程完成)

CountDownlatch(等待多线程完成)

应用场景:主线程等待其它线程的结束,比如运动员和裁判,所有运动员准备好后,裁判发令,所有运动员到终点后,裁判提示比赛结束。有awaite,和countDown方法

使用demo

package com.w.juc;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo implements Runnable{
   private static CountDownLatch begin = new CountDownLatch(1);
   private static CountDownLatch end = new CountDownLatch(5);


   private void sport(){
       System.out.println(Thread.currentThread()+"等待发令枪响");
       try {
           begin.await();
           System.out.println(Thread.currentThread()+"正在跑步");
           Thread.sleep((long)Math.random()*10000);
           System.out.println(Thread.currentThread()+"到终点了");
           end.countDown();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

   }

    @Override
    public void run() {
        sport();
    }

    public static void main(String[] args) {
        System.out.println("教练开枪了,开跑");


        new Thread(new CountDownLatchDemo(),"一号").start();
        new Thread(new CountDownLatchDemo(),"二号").start();
        new Thread(new CountDownLatchDemo(),"三号").start();
        new Thread(new CountDownLatchDemo(),"四号").start();
        new Thread(new CountDownLatchDemo(),"五号").start();



        try {
            Thread.sleep(2000);
            begin.countDown();
            end.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("比赛结束");
    }
}

countDownLatch:构造时使用int接收一个参数N,线程A调用awaite(),将会等待直到其它线程调用countdown()将N减为0。

源码分析

CountDownLatch:用法和Thread.join是差不多的,都是等待其它线程结束/调用countdown方法,然后通知主线程可以继续执行。

Tread.join()
是通过一个while循环,判断当前线程是否还活着,或者就继续让主线程线程等待。wait(0)表示永远的等待下去。当执行完毕后,调用this.notifyAll().
while(isAlive()){
	wait(0);
}

CountDownLatch实现了和Tread.join()一样的功能,还有更多功能。

CountDownLatch是一个比较简单的类,只有一个内部类Sync,和几个方法

结构

底层其实是AQS,通过它来管理同步状态,控制线程的排队,通知线程等待和唤醒。

  1. 构造方法

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);//直接初始化一个同步器
    }
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync继承了AQS队列同步器,需要去重写它的几个方法。
    
    Sync(int count) {//构造方法,初始化同步状态,也就是计数器值
                setState(count);
            }
    //重写AQS的方法,获取共享锁
    protected int tryAcquireShared(int acquires) {//参数没有用到
        return (getState() == 0) ? 1 : -1; //当计数器也就是同步状态等于0时,代表等待的主线程可以被唤醒了
     	//不为0则进入同步队列   
    }
    //cas将计算机值减一   
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                //如果减为0,返回true,唤醒等待的线程
                return nextc == 0;
        }
    }
    
  2. 主要的两个方法

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
     public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)//调用子类重写的方法,同步状态不为0时,将当前线程放入阻塞队列
                doAcquireSharedInterruptibly(arg);
        }
    
     private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);//构造一个节点,原子性的加入同步(阻塞)队列尾部
            boolean failed = true;
            try {
                //自旋准备获取同步状态,当前节点前驱是头结点(已经持有同步状态的线程节点),则可以尝试获取同步状态。
                for (;;) {
                    //获取node的前驱节点
                    final Node p = node.predecessor();
                 	//如果p == head 则说明node的前驱节点是头节点,即可以去尝试获取同步状态   
                    if (p == head) {
                        int r = tryAcquireShared(arg);//r>0表示,同步状态已经为0了,可以获取同步状态了
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //不是则执行这里
                    //shouldParkAfterFailedAcquire(p, node),
                    //已知,当节点为头节点时,需要将节点状态设置为Signal表示有后继节点需要被唤醒,就是获取同步状态。
                    //这个方法会去判断当前前驱节点状态是否为signal
                    //是signal则返回true,>0则表示前驱节点被取消了,就跳过前驱节点。再如果不是就cas更新前驱节点状态为signal
                    // parkAndCheckInterrupt(),LockSupport.park(this);挂起当前节点对应的线程...
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {//调用的是重写的方法,如果当前同步状态为0则返回true
                doReleaseShared();//只有一个线程会进入
                return true;
            }
            return false;
        }
    // 案例:// t3 线程在if(h == head) 返回false时,t3 会继续自旋. 参与到 唤醒下一个head.next的逻辑..
    // t3 此时执行到 CAS WaitStatus(h,Node.SIGNAL, 0) 成功.. t4 在t3修改成功之前,
    //也进入到 if (ws == Node.SIGNAL) 里面了
    // 但是t4 修改 CAS WaitStatus(h,Node.SIGNAL, 0) 会失败,因为 t3 改过了...
    
     private void doReleaseShared() {//释放所有阻塞的节点
        
            for (;;) {//自旋一个一个释放
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {//如果前驱节点的状态是Signal,则进入
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//会将头节点状态设置为初始状态,cas操作失败继续
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);//唤醒头节点的下一个节点
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//为了失败的cas操作,见上面案例
                        continue;                // loop on failed CAS
                }
                //上面循环是个自旋,再这里退出循环
                //1.什么时候会退出
                /* 1. 正常结束,唤醒完所有阻塞节点,最后一个节点就是头节点,就会退出
                *  2. awaite()被多个线程调用,也就是说多个线程一直再自旋等待同时获取同步状态,
                *  多线程条件下,当可以获取同步状态时,自旋再doAcquireSharedInterruptibly的线程,和此方法都会执行。
                *  此时就会有可能,doAcquireSharedInterruptibly方法还未设置头节点,但是此方法已经执行到了这里,已经被唤醒
                *  此时就会退出唤醒逻辑,因为已经出问题了,被唤醒的节点并不是头节点。
                *  唤醒逻辑转移到doAcquireSharedInterruptibly中,会再次调用此方法
                */
                if (h == head)                   // loop if head changed
                    break;//正常情况下,不符合条件就说明还有节点待唤醒。
            }
        }
    
    
    

总结

(1)CountDownLatch表示允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作;

(2)CountDownLatch使用AQS的共享锁机制实现;

(3)CountDownLatch初始化的时候需要传入次数count;只有一个构造参数。

(4)每次调用countDown()方法count的次数减1;

(5)每次调用await()方法的时候会尝试获取锁,这里的获取锁其实是检查AQS的state变量的值是否为0;

(6)当count的值(也就是state的值)减为0的时候会唤醒排队着的线程(这些线程调用await()进入队列);

比较:

与Thread.join()区别:

Thread.join是在主线程中调用的,得等到其它线程全部执行结束。而CountDownLatch则是在随时调用countDown方法。

共享锁与互斥锁比较:

为什么使用共享锁?共享锁同步状态可以当作是计数器值,每当有线程调用countdown方法计数减一,而释放锁,也就是同步状态为0时需要唤醒所有等待线程,调用awaite的线程。但互斥锁同步状态为1,释放锁只能唤醒一个线程一次。

上一篇:[多线程] 多线程工具之CountDownLatch


下一篇:等待多线程完成的CountDownLatch(带示例)