CountDownLatch 的使用与源码解析

使用

CountDownLatch 类似于 Thread 的 join 方法,使用时,先构造 CountDownLatch 对象,构造函数传线程数据数 n,表示等待这 n 个线程都完成后再执行主线程代码。

主线程使用 await 方法阻塞等待 n 个线程执行完成;n 个线程执行完成后调用 countDown() 方法,表示完成了一个线程,当 n 个线程都调用了一次 countDown() 方法后,await 阻塞的主线程就会被唤醒继续执行。

Java8 的 CompletableFuture 也可以用来代替 CountDownLatch。

代码示例:

@Test
public void test() throws InterruptedException {
    // 定义一个线程池
    ExecutorService executor = Executors.newFixedThreadPool(2);

    // CountDownLatch 用来作为一个开关,控制主线程阻塞,直到几个异步线程都完成后,主线程再接着执行
    // CountDownLatch 的构造函数传参是一个 int 型数据,等待几个异步线程,就传异步线程的数量即可
    // 比如,当前线程需要等待 3 个异步线程先执行完成后,再执行,那么传参为 3
    int count = 3;
    CountDownLatch countDownLatch = new CountDownLatch(count);

    IntStream.range(0, count).forEach(i -> {
        executor.submit(() -> {
            System.out.println("条件" + (i + 1) + "正在执行");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("条件" + (i + 1) + "执行完成");

            // 线程执行完成后,需要调用 countDown() 方法,告诉主线程当前有一个线程完成了
            countDownLatch.countDown();
        });
    });

    System.out.println("主线程阻塞等待条件完成");
    // 主线程等待条件完成,当所有线程都调用了 countDown() 后阻塞的主线程就会被唤醒,继续往下执行
    countDownLatch.await();
    System.out.println("条件都已完成,继续执行主线程逻辑");

    executor.shutdown();

    /**
         * 执行结果:
         *
         * 主线程阻塞等待条件完成
         * 条件2正在执行
         * 条件1正在执行
         * 条件1执行完成
         * 条件3正在执行
         * 条件2执行完成
         * 条件3执行完成
         * 条件都已完成,继续执行主线程逻辑
         */
}

源码

属性

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 共享线程数
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    // 尝试获取共享锁
    protected int tryAcquireShared(int acquires) {
        // state == 0 时返回 1,否则返回 -1
        return (getState() == 0) ? 1 : -1;
    }

    // 尝试释放共享锁
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

构造函数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

await

AQS 共享锁参考 ReentrantLock

// CountDownLatch 构造方法,将传的参数 count 赋值给 state
// 每次调用 countDown 方法时,会释放一个锁,即 state 减一
// 如果主线程调用了 await 时会去判断,如果 state > 0,那么主线程就会被阻塞
// 等待 count 个线程都调用了 countDown 方法后,才会唤醒主线程
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

countDown

// 线程释放一个共享锁,state 减一
public void countDown() {
    // 会去唤醒阻塞的主线程
    // 主线程被唤醒后,如果条件不符合(state > 0)主线程还是会继续阻塞,直到最后一个线程调用了 countDown
    // 此时 state == 0,主线程被唤醒后不会再被阻塞
    sync.releaseShared(1);
}
上一篇:CountDownLatch是干啥的?


下一篇:JS-模拟京东倒计时效果