CountDownLatch demo与源码

​ 在之前项目中碰到一个复杂查询,就是需要先分页查询出20条数据,然后根据事件类型对这20条数据分为4类,分别用线程查询这4类的特有信息,然后等所有的线程执行完成之后,在对这20条数据根据事件排序,最后返回给前端。因为是使用的线程查询,所以不知道什么时候会执行完。找了很久找到了方案,就是使用CountDownLatch。

CountDownLatch和CyclicBarrier都是java.util.concurrent包下面的多线程工具类。今天只讲CountDownLatch,下次再来看CyclicBarrier。

一、CountDownLatch

1.CountDownLatch的作用:

CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。

2.CountDownLatch的应用场景:一个任务划分成多个任务执行。

场景1:

​ 就拿上面的例子来说吧,线程1,2,3,4执行到栅栏位置的时候被阻塞,需要等待所有的线程都执行都得时候,才能打开栅栏,开始执行后面得排序方法。

CountDownLatch demo与源码

package cn.seven.countdownlatch;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * ClassName:    Demo1
 * Package:    cn.seven.countdownlatch
 * Description: CountdownLatchTest01
 * Datetime:    2020/5/13   20:47
 *
 * @Author: charon
 */
public class Demo1 {

    /**
     * @param args 参数
     */
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        final CountDownLatch latch = new CountDownLatch(4);

        System.out.println("主线程,"+Thread.currentThread().getName()+"执行到这里,分成4个线程执行");

        Runnable runnable0 = () -> {
            try {
                System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                Thread.sleep(10000);
                System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                // 当前线程调用此方法,则计数减一
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        executorService.execute(runnable0);

        Runnable runnable1 = () -> {
            try {
                System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                Thread.sleep(11000);
                System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                // 当前线程调用此方法,则计数减一
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        executorService.execute(runnable1);

        Runnable runnable2 = () -> {
            try {
                System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                Thread.sleep(12000);
                System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                // 当前线程调用此方法,则计数减一
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        executorService.execute(runnable2);

        Runnable runnable3 = () -> {
            try {
                System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
                Thread.sleep(13000);
                System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
                // 当前线程调用此方法,则计数减一
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        executorService.execute(runnable3);

        System.out.println("主线程"+Thread.currentThread().getName()+"等待子线程执行完成");
        //阻塞当前线程,直到计数器的值为0
        latch.await();
        System.out.println("主线程"+Thread.currentThread().getName()+"开始执行排序...");
    }
}

场景2:

​ 我们都见过跑步比赛,运动员等待裁判员发令枪响,然后运动员起跑,等所有远动员跑到终点了,裁判员就计算名次。

CountDownLatch demo与源码

package cn.seven.countdownlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * ClassName:    Demo2
 * Package:    cn.seven.countdownlatch
 * Description: 模拟运动员比赛,发令枪响运动员开始起跑,等待所有运动员跑完,统计名次
 * Datetime:    2020/5/13   21:14
 *
 * @Author: charon
 */
public class Demo2 {

    /**
     * 执行
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //响枪的栅栏
        final CountDownLatch countDownLatch1 = new CountDownLatch(1);
        //比赛结束的栅栏
        final CountDownLatch countDownLatch2 = new CountDownLatch(3);

        for (int i = 0;i< 3;i++){
            Runnable runnable = () -> {
                try {
                    System.out.println("运动员"+Thread.currentThread().getName()+"等待信号枪");
                    // 跑之前阻塞线程,等到countDownLatch1的count为0开跑
                    countDownLatch1.await();
                    System.out.println("运动员"+Thread.currentThread().getName()+"开跑");
                    Thread.sleep(10);
                    System.out.println("运动员"+Thread.currentThread().getName()+"到达终点");

                    countDownLatch2.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            executorService.execute(runnable);
        }
        try {
            Thread.sleep(5000);

            System.out.println("裁判"+Thread.currentThread().getName()+"即将鸣信号枪");
            //递归减一的操作,直到count为0
            countDownLatch1.countDown();
            System.out.println("裁判"+Thread.currentThread().getName()+"鸣响信号枪,等待运动员跑完");
            //等待countDownLatch2 的count减为0,才能继续执行后面的代码
            countDownLatch2.await();
            System.out.println("运动员已经跑到终点,裁判"+Thread.currentThread().getName()+"统计名次");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }

}

3. 下面就来分析一下CountDownLatch的两个重要方法吧!!

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
	this.sync = new Sync(count);
}

Sync(int count) {
      setState(count);
}

这是CountDownLatch的构造器,需要设置一个初始大小,即线程个数。如果count小于0,直接抛出异常。否则就将构造器中的count传递给AQS的state。

所以CountDownLatch中的countDown()就是对state状态的改变。await()是通过轮询state的状态来判断所有的任务是否都完成。

countDown源码分析:

当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为 0 则会唤醒所有调用await 方法而被阻塞的线程,否则什么都不做。

public void countDown() {
     sync.releaseShared(1);//递减锁的技术,如果count为0,就释放锁,如果count大于0,就count减一
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {  //尝试释放锁,这个方法在sync中重写,如果count的值为0,则执行下面的操作
        doReleaseShared();  
        return true;
    }
    return false;
}

/**
  * CountDownLatch的内部类sync重写的这个尝试释放锁的方法
  */
protected boolean tryReleaseShared(int releases) {
    //  递减计数;转换为零时的信号
    for (;;) { //使用死循环来尝试释放锁,当前线程成功完成cas使计数值(状态值state)减一并更新到state
        int c = getState();
        if (c == 0) //如果count等于0,则退出,为了防止计数器值为 0 后,其他线程又调用了countDown方法,如果没有判断,状态值就会变成负数。
            return false;
        int nextc = c-1; //每执行一次,count 减一
        if (compareAndSetState(c, nextc)) //利用cas机制来更新state得状态,调用unsafe.compareAndSwapInt()操作内存,如果当前状态值等于预期值,原子地将同步状态设置为给定的已更新值
            return nextc == 0; // 更新成功就返回
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {//指示后续线程需要断开连接
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // 循环复查
                unparkSuccessor(h);//唤醒后续节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

await源码分析:

当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:

(1)当所有线程都调用了CountDownLatch对象的countDown方法后,也就是说计时器值为 0 的时候。

(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//尝试看当前count是否为0,为0则直接返回,否者进入AQS的队列等待
        doAcquireSharedInterruptibly(arg);
}

/**
 * CountDownLatch的内部类sync重写的这个方法
 */
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;//state不等于0将会返回-1,进入上面那个方法加入AQS队列等待
}

//AQS等待队列,使用的乐观锁获得共享资源
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//addWaiter为AQS的加入队尾
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();//获取前一个节点
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //设置队列头,并检查后续进程是否可能在共享模式下等待,如果是这样,则在设置了propagate>0或propagate status时进行传播。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //检查并修改一个节点的状态,当该节点获取锁失败时。返回true如果线程需要阻塞,并挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)//如果是非正常退出的话,取消获取
                cancelAcquire(node);
        }
    }

参考网址:

https://www.cnblogs.com/*ncong/p/9275634.html
http://ifeve.com/countdownlatch源码解析/

上一篇:前端知识体系:JavaScript基础-作用域和闭包-闭包的实现原理和作用以及堆栈溢出和内存泄漏原理和相应解决办法


下一篇:CountDownLatch源码解析