文章目录
CountDownLatch是什么
countDownLatch是在java1.5被引入,可以翻译为计数器。主要用于等待多个线程一起执行完成后继续下一步,Thread.join()有点类似
核心方法
构造方法
CountDownLatch 值提供了一个构造方法,传入一个计数器的值
常用的其他三个方法
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
//调用AQS的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
// 和上面方法一样,添加了一个超时时间,到超时时间及时conut没到0调用线程也继续执行
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 计数器count值减1
public void countDown() {
//调用AQS的释放共享锁方法
sync.releaseShared(1);
}
可以看到 这几个方法都是直接 调用 sync这个类的方法,这个sync是个什么类呢
这里可以看到 Sync 是 CountDownLatch 的一个内部类,而Sync 继承了AbstractQueuedSynchronizer,上面的 acquireSharedInterruptibly 都是直接调用AbstractQueuedSynchronizer父类的acquireSharedInterruptibly方法
而最终调用的是AbstractQueuedSynchronizer的 doAcquireSharedInterruptibly
方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 添加节点至等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取node的前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头结点
if (p == head) {
/** 试图在共享模式下获取对象状态,tryAcquireShared为抽象方法供子类实现,
* Sync就实现了该方法,很简单的判断一个state状态是否为0
* ,state被volatile修饰,保证多线程见的可见性,同时不适用锁提升性能
**/
int r = tryAcquireShared(arg);
// 获取成功
if (r >= 0) {
//设置队列头,并检查后来者是否在共享模式下等待,如果是,则传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
使用案例
这里案例我们基于RocketMQ的源码使用方式来模拟
这里可以看到Broker 注册到NamerServer的时候就使用了CountDownLatch,不了解RocketMq也可以简单的理解为,我一个系统需要RPC调用多个三方系统发短信,而网络IO请求相对耗时,所以我们使用多线程去请求,同时要保证所有短信发送完成我们才能执行后续代码返回发送成功
public static void main(String[] args) {
// 使用Java自带线程池,生产环境不推荐使用,这里演示demo为了简单
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 需要发送 20个短信
CountDownLatch countDownLatch = new CountDownLatch(20);
AtomicInteger count = new AtomicInteger();
for (int i = 0; i < 20; i++) {
threadPool.execute(() -> {
// 模拟发送短信
System.out.println("发送短信");
// 假装耗时2s
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 发送短信完成
System.out.println("-----------发送短信数量:" + count.incrementAndGet());
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有短信发送完成");
}
执行结果:
发送短信
发送短信
发送短信
发送短信
发送短信
-----------发送短信数量:1
-----------发送短信数量:5
-----------发送短信数量:4
-----------发送短信数量:3
发送短信
发送短信
-----------发送短信数量:2
发送短信
发送短信
发送短信
-----------发送短信数量:7
-----------发送短信数量:9
发送短信
-----------发送短信数量:10
-----------发送短信数量:8
-----------发送短信数量:6
发送短信
发送短信
发送短信
发送短信
-----------发送短信数量:13
-----------发送短信数量:12
-----------发送短信数量:14
-----------发送短信数量:11
-----------发送短信数量:15
发送短信
发送短信
发送短信
发送短信
发送短信
-----------发送短信数量:17
-----------发送短信数量:18
-----------发送短信数量:20
-----------发送短信数量:16
-----------发送短信数量:19
所有短信发送完成
Process finished with exit code -1