CountDownLatch用法与原理

主要方法:

构造函数 CountDownLatch(int n)
阻塞方法 latch.await() 阻塞当前线程 直到latch状态为已完成
计数减一 latch.countDown() 次数减1 直到为0 状态变更

 

场景举例:

1.长跑比赛,一共有2个人一起赛跑, 每个人准备好之后在原地等待, 发令员每看到一个准备好就记录一次,未准备人数减1, 直到为准备人数为0就发令起跑

public static void main(String[] args) throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(2);

    new Thread(()->{
        try {
            System.out.println("准备完毕 原地等待"+Thread.currentThread().getName());
            latch.await();
            System.out.println("听到发令 开始赛跑"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "t1").start();

    //记录未准备人数减一
    Thread.sleep(1000);
    latch.countDown();


    new Thread(()->{
        try {
            System.out.println("准备完毕 原地等待"+Thread.currentThread().getName());
            latch.await();
            System.out.println("听到发令 开始赛跑"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "t2").start();

    //记录未准备人数减一
    Thread.sleep(1000);
    latch.countDown();

}

执行结果:

第一次

准备完毕 原地等待t1
准备完毕 原地等待t2
听到发令 开始赛跑t1
听到发令 开始赛跑t2

  第二次

准备完毕 原地等待t1
准备完毕 原地等待t2
听到发令 开始赛跑t2
听到发令 开始赛跑t1

 

 

第二次latch.countDown()时得知全部准备完毕,则表示发令起跑,此时两位选手开始起跑, 谁先跑完则是由各自能力决定

小结: 赛跑用法常用于软件压测, 同时开启若干测试线程,并进入等待状态,当所有线程准备完毕,同时执行业务,达到高并发测试

 

2.同样是长跑比赛,既然有比赛自然也有名次,那么只有在最后一名选手跑完才能宣布比赛结束

public static void main(String[] args) throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(2);

    new Thread(()->{

        try {
            System.out.println("努力奔跑"+Thread.currentThread().getName());
            //跑完全程要花2秒
            Thread.sleep(2000L);
            System.out.println("奔跑结束 到达终点"+Thread.currentThread().getName());
            latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }, "t1").start();

    new Thread(()->{

        try {

            System.out.println("努力奔跑"+Thread.currentThread().getName());
            //跑完全程要花5秒
            Thread.sleep(5000L);
            System.out.println("奔跑结束 到达终点"+Thread.currentThread().getName());
            latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }, "t2").start();
    //0.1秒睡眠只是防止打印串位 无实际作用
    Thread.sleep(100);
    System.out.println("等待最后一名选手跑完");
    latch.await();
    System.out.println("比赛结束所有人都已跑完");

}

运行结果:
努力奔跑t1
努力奔跑t2
等待最后一名选手跑完
奔跑结束 到达终点t1
奔跑结束 到达终点t2
比赛结束所有人都已跑完

latch.await()会阻塞主线程, 直到最后一个人跑完,才宣布比赛结束

小结:此示例可用于多个线程同时执行, 但是在最后一个线程执行完成后报告结果, 避免轮询查询是否所有线程都已执行完成, 当然另外也有很多种实现方式,比如CompletableFuture就常用来解决此类问题,不在此文过多解释

 

方法源码分析:

1.构造函数CountDownLatch(int count), 对AQS有了解的人很容易看懂下面代码其实就是对state的设置 

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
//AQS 设置state的值为count
Sync(int count) {
    setState(count);
}
protected final void setState(int newState) {
    state = newState;
}

 

2.阻塞方法 latch.await()

利用AQS共享锁的原理, 如果AQS state属性非0, 则将当前线程节点放入AQS等待队列, 并将等待模式设置为共享模式(具备先传播在解锁功能达到共享), 然后阻塞当前执行线程

类 CountDownLatch 
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

类 CountDownLatch.Sync
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试获取锁 根据下面一段代码可知state初始为count
    //若state未改变的情况下(countDown()会讲如何改变的)必然要执行doAcquireSharedInterruptibly(arg)
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//尝试获取同步锁
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
//获取同步锁
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //新增等待节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //设置头部节点并且节点等待模式为共享模式(共享模式概念可以在AQS文章中找到)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 

3.latch.countDown() 计数方法

利用AQS共享锁释放的原理, 当计数state=0的时候, 将等待队列中的线程以链式调用(传播)方法唤醒

类 CountDownLatch 
public void countDown() {
        sync.releaseShared(1);
    }

类 CountDownLatch.Sync
//释放共享锁(不一定释放成功 只是尝试)
public final boolean releaseShared(int arg) {
    //尝试释放共享锁 当计数等于0 tryReleaseShared(arg)等于true
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
//尝试释放共享锁
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        //计数减一
        int nextc = c-1;
        //原子操作 state=nextc  然后返回原子操作后nextc也即state是否等于0
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
//释放共享锁 这一段就不详细讲了 做的事情就是共享锁的释放
//如果当前节点的waitStatus=-3则 继续传播给下一个节点 
//遇到非waitStatus=-3跳过 继续传播给下一个节点, 那么就达到唤醒所有节点上的线程
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

 

总结:

CountDownLatch底层用的是AQS原理, 如果掌握了AQS的原理CountDownLatch是很容易理解的

上一篇:jdk11源码--CountDownLatch源码分析


下一篇:java并发编程------线程状态(interrupt(),isInterrupted()和interrupted())