java多线程-countDownLatch介绍

java多线程

countDownLatch介绍

概念

countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
它是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

源码

countDownLatch类中只提供了一个构造器:

//参数count为计数值
public CountDownLatch(int count) {  };  

三个重要方法:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };   
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
//将count值减1
public void countDown() { };  

使用示例:

    public void multiThreadTest() {
        final CountDownLatch begin = new CountDownLatch(1); // 控制线程开始
        final CountDownLatch end = new CountDownLatch(30); // 控制线程结束
        final ExecutorService exec = Executors.newFixedThreadPool(15); // 线程池管理线程
        for (int j = 0; j < 30; j++) {
            exec.submit(new ServiceThread(begin, end, j)); // 多个线程运动员准备中
        }
        begin.countDown(); // 发送开始号令,多线程运动员们开始运行
        try {
            end.await(); // 等待子线程全都运行完成之后,执行后续动作
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }
        exec.shutdown();
        System.out.println("Done");
    }

    @Data
    @AllArgsConstructor
    public class ServiceThread implements Runnable {
        private CountDownLatch begin;
        private CountDownLatch end;
        private int j;

        @Override
        public void run() {
            try {
                begin.await(); // 线程运动员准备中,阻塞在这等待其它运动员准备完成,然后等待号令begin.countDown()之后开始运行
                for (int i = 0; i < 10; i++) {
                    System.out.println("j是" +j+",i是"+i);
                }
            } catch (Exception e) {

            } finally {
                /*
                 * 一个子线程运动员完成了,但主线程会阻塞在end.await()这里,
                 * 当end这个减为0时,将越过end.await()方法,执行后续操作。
                 * */
                end.countDown();
            }
        }
    }

CountDownLatch实现原理

1、创建计数器
当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,此时会创建一个AQS的同步队列,并把创建CountDownLatch 传进来的计数器赋值给AQS队列的 state,所以state的值也代表CountDownLatch所剩余的计数次数;

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);//创建同步队列,并设置初始计数器值
    }

2、阻塞线程
当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

判断计数器是计数完毕,未完毕则把当前线程加入阻塞队列

  public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

构建阻塞队列的双向链表,挂起当前线程

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //新建节点加入阻塞队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获得当前节点pre节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//返回锁的state
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //重组双向链表,清空无效节点,挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

3、计数器递减

当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒。

 public void countDown() {
        //递减锁重入次数,当state=0时唤醒所有阻塞线程
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
        //递减锁的重入次数
        if (tryReleaseShared(arg)) {
            doReleaseShared();//唤醒队列所有阻塞的节点
            return true;
        }
        return false;
    }
 private void doReleaseShared() {
        //唤醒所有阻塞队列里面的线程
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//节点是否在等待唤醒状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始
                        continue;
                    unparkSuccessor(h);//成功则唤醒线程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
上一篇:Java高并发编程基础三大利器之CountDownLatch


下一篇:CountDownLatch源码解析