多线程同步器之CountDownLatch

文章目录

CountDownLatch是什么

countDownLatch是在java1.5被引入,可以翻译为计数器。主要用于等待多个线程一起执行完成后继续下一步,Thread.join()有点类似

核心方法

构造方法

多线程同步器之CountDownLatch

CountDownLatch 值提供了一个构造方法,传入一个计数器的值

常用的其他三个方法

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
		//调用AQS的acquireSharedInterruptibly()方法
        sync.acquireSharedInterruptibly(1);
    }
// 和上面方法一样,添加了一个超时时间,到超时时间及时conut没到0调用线程也继续执行
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
// 计数器count值减1
public void countDown() {
		//调用AQS的释放共享锁方法
        sync.releaseShared(1);
    }

可以看到 这几个方法都是直接 调用 sync这个类的方法,这个sync是个什么类呢

多线程同步器之CountDownLatch

这里可以看到 Sync 是 CountDownLatch 的一个内部类,而Sync 继承了AbstractQueuedSynchronizer,上面的 acquireSharedInterruptibly 都是直接调用AbstractQueuedSynchronizer父类的acquireSharedInterruptibly方法

而最终调用的是AbstractQueuedSynchronizer的 doAcquireSharedInterruptibly方法

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加节点至等待队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            	// 获取node的前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点为头结点
                if (p == head) {
                    /** 试图在共享模式下获取对象状态,tryAcquireShared为抽象方法供子类实现,
                    *   Sync就实现了该方法,很简单的判断一个state状态是否为0
                    * ,state被volatile修饰,保证多线程见的可见性,同时不适用锁提升性能
                    **/
                    int r = tryAcquireShared(arg);
                    // 获取成功
                    if (r >= 0) {
                    //设置队列头,并检查后来者是否在共享模式下等待,如果是,则传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

使用案例

这里案例我们基于RocketMQ的源码使用方式来模拟
多线程同步器之CountDownLatch
这里可以看到Broker 注册到NamerServer的时候就使用了CountDownLatch,不了解RocketMq也可以简单的理解为,我一个系统需要RPC调用多个三方系统发短信,而网络IO请求相对耗时,所以我们使用多线程去请求,同时要保证所有短信发送完成我们才能执行后续代码返回发送成功

public static void main(String[] args) {
        // 使用Java自带线程池,生产环境不推荐使用,这里演示demo为了简单

        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        // 需要发送 20个短信
        CountDownLatch countDownLatch = new CountDownLatch(20);
        AtomicInteger count = new AtomicInteger();
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() -> {
                // 模拟发送短信
                System.out.println("发送短信");
                // 假装耗时2s
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 发送短信完成
                System.out.println("-----------发送短信数量:" + count.incrementAndGet());
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("所有短信发送完成");

    }

执行结果:

发送短信
发送短信
发送短信
发送短信
发送短信
-----------发送短信数量:1
-----------发送短信数量:5
-----------发送短信数量:4
-----------发送短信数量:3
发送短信
发送短信
-----------发送短信数量:2
发送短信
发送短信
发送短信
-----------发送短信数量:7
-----------发送短信数量:9
发送短信
-----------发送短信数量:10
-----------发送短信数量:8
-----------发送短信数量:6
发送短信
发送短信
发送短信
发送短信
-----------发送短信数量:13
-----------发送短信数量:12
-----------发送短信数量:14
-----------发送短信数量:11
-----------发送短信数量:15
发送短信
发送短信
发送短信
发送短信
发送短信
-----------发送短信数量:17
-----------发送短信数量:18
-----------发送短信数量:20
-----------发送短信数量:16
-----------发送短信数量:19
所有短信发送完成

Process finished with exit code -1


上一篇:正则表达式知识点总结


下一篇:zookeeper03-java的Api操作(1)