在之前项目中碰到一个复杂查询,就是需要先分页查询出20条数据,然后根据事件类型对这20条数据分为4类,分别用线程查询这4类的特有信息,然后等所有的线程执行完成之后,在对这20条数据根据事件排序,最后返回给前端。因为是使用的线程查询,所以不知道什么时候会执行完。找了很久找到了方案,就是使用CountDownLatch。
CountDownLatch和CyclicBarrier都是java.util.concurrent包下面的多线程工具类。今天只讲CountDownLatch,下次再来看CyclicBarrier。
一、CountDownLatch
1.CountDownLatch的作用:
CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。
2.CountDownLatch的应用场景:一个任务划分成多个任务执行。
场景1:
就拿上面的例子来说吧,线程1,2,3,4执行到栅栏位置的时候被阻塞,需要等待所有的线程都执行都得时候,才能打开栅栏,开始执行后面得排序方法。
package cn.seven.countdownlatch;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* ClassName: Demo1
* Package: cn.seven.countdownlatch
* Description: CountdownLatchTest01
* Datetime: 2020/5/13 20:47
*
* @Author: charon
*/
public class Demo1 {
/**
* @param args 参数
*/
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
final CountDownLatch latch = new CountDownLatch(4);
System.out.println("主线程,"+Thread.currentThread().getName()+"执行到这里,分成4个线程执行");
Runnable runnable0 = () -> {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
Thread.sleep(10000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
// 当前线程调用此方法,则计数减一
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executorService.execute(runnable0);
Runnable runnable1 = () -> {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
Thread.sleep(11000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
// 当前线程调用此方法,则计数减一
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executorService.execute(runnable1);
Runnable runnable2 = () -> {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
Thread.sleep(12000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
// 当前线程调用此方法,则计数减一
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executorService.execute(runnable2);
Runnable runnable3 = () -> {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"开始执行");
Thread.sleep(13000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行结束");
// 当前线程调用此方法,则计数减一
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executorService.execute(runnable3);
System.out.println("主线程"+Thread.currentThread().getName()+"等待子线程执行完成");
//阻塞当前线程,直到计数器的值为0
latch.await();
System.out.println("主线程"+Thread.currentThread().getName()+"开始执行排序...");
}
}
场景2:
我们都见过跑步比赛,运动员等待裁判员发令枪响,然后运动员起跑,等所有远动员跑到终点了,裁判员就计算名次。
package cn.seven.countdownlatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* ClassName: Demo2
* Package: cn.seven.countdownlatch
* Description: 模拟运动员比赛,发令枪响运动员开始起跑,等待所有运动员跑完,统计名次
* Datetime: 2020/5/13 21:14
*
* @Author: charon
*/
public class Demo2 {
/**
* 执行
* @param args
*/
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
//响枪的栅栏
final CountDownLatch countDownLatch1 = new CountDownLatch(1);
//比赛结束的栅栏
final CountDownLatch countDownLatch2 = new CountDownLatch(3);
for (int i = 0;i< 3;i++){
Runnable runnable = () -> {
try {
System.out.println("运动员"+Thread.currentThread().getName()+"等待信号枪");
// 跑之前阻塞线程,等到countDownLatch1的count为0开跑
countDownLatch1.await();
System.out.println("运动员"+Thread.currentThread().getName()+"开跑");
Thread.sleep(10);
System.out.println("运动员"+Thread.currentThread().getName()+"到达终点");
countDownLatch2.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executorService.execute(runnable);
}
try {
Thread.sleep(5000);
System.out.println("裁判"+Thread.currentThread().getName()+"即将鸣信号枪");
//递归减一的操作,直到count为0
countDownLatch1.countDown();
System.out.println("裁判"+Thread.currentThread().getName()+"鸣响信号枪,等待运动员跑完");
//等待countDownLatch2 的count减为0,才能继续执行后面的代码
countDownLatch2.await();
System.out.println("运动员已经跑到终点,裁判"+Thread.currentThread().getName()+"统计名次");
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
3. 下面就来分析一下CountDownLatch的两个重要方法吧!!
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
这是CountDownLatch的构造器,需要设置一个初始大小,即线程个数。如果count小于0,直接抛出异常。否则就将构造器中的count传递给AQS的state。
所以CountDownLatch中的countDown()就是对state状态的改变。await()是通过轮询state的状态来判断所有的任务是否都完成。
countDown源码分析:
当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为 0 则会唤醒所有调用await 方法而被阻塞的线程,否则什么都不做。
public void countDown() {
sync.releaseShared(1);//递减锁的技术,如果count为0,就释放锁,如果count大于0,就count减一
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试释放锁,这个方法在sync中重写,如果count的值为0,则执行下面的操作
doReleaseShared();
return true;
}
return false;
}
/**
* CountDownLatch的内部类sync重写的这个尝试释放锁的方法
*/
protected boolean tryReleaseShared(int releases) {
// 递减计数;转换为零时的信号
for (;;) { //使用死循环来尝试释放锁,当前线程成功完成cas使计数值(状态值state)减一并更新到state
int c = getState();
if (c == 0) //如果count等于0,则退出,为了防止计数器值为 0 后,其他线程又调用了countDown方法,如果没有判断,状态值就会变成负数。
return false;
int nextc = c-1; //每执行一次,count 减一
if (compareAndSetState(c, nextc)) //利用cas机制来更新state得状态,调用unsafe.compareAndSwapInt()操作内存,如果当前状态值等于预期值,原子地将同步状态设置为给定的已更新值
return nextc == 0; // 更新成功就返回
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//指示后续线程需要断开连接
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 循环复查
unparkSuccessor(h);//唤醒后续节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
await源码分析:
当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:
(1)当所有线程都调用了CountDownLatch对象的countDown方法后,也就是说计时器值为 0 的时候。
(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//尝试看当前count是否为0,为0则直接返回,否者进入AQS的队列等待
doAcquireSharedInterruptibly(arg);
}
/**
* CountDownLatch的内部类sync重写的这个方法
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;//state不等于0将会返回-1,进入上面那个方法加入AQS队列等待
}
//AQS等待队列,使用的乐观锁获得共享资源
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//addWaiter为AQS的加入队尾
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();//获取前一个节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置队列头,并检查后续进程是否可能在共享模式下等待,如果是这样,则在设置了propagate>0或propagate status时进行传播。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//检查并修改一个节点的状态,当该节点获取锁失败时。返回true如果线程需要阻塞,并挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)//如果是非正常退出的话,取消获取
cancelAcquire(node);
}
}
参考网址:
https://www.cnblogs.com/*ncong/p/9275634.html
http://ifeve.com/countdownlatch源码解析/