主要方法:
构造函数 CountDownLatch(int n)
阻塞方法 latch.await() 阻塞当前线程 直到latch状态为已完成
计数减一 latch.countDown() 次数减1 直到为0 状态变更
场景举例:
1.长跑比赛,一共有2个人一起赛跑, 每个人准备好之后在原地等待, 发令员每看到一个准备好就记录一次,未准备人数减1, 直到为准备人数为0就发令起跑
public static void main(String[] args) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(2); new Thread(()->{ try { System.out.println("准备完毕 原地等待"+Thread.currentThread().getName()); latch.await(); System.out.println("听到发令 开始赛跑"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); //记录未准备人数减一 Thread.sleep(1000); latch.countDown(); new Thread(()->{ try { System.out.println("准备完毕 原地等待"+Thread.currentThread().getName()); latch.await(); System.out.println("听到发令 开始赛跑"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2").start(); //记录未准备人数减一 Thread.sleep(1000); latch.countDown(); }
执行结果:
第一次
准备完毕 原地等待t1
准备完毕 原地等待t2
听到发令 开始赛跑t1
听到发令 开始赛跑t2
第二次
准备完毕 原地等待t1
准备完毕 原地等待t2
听到发令 开始赛跑t2
听到发令 开始赛跑t1
第二次latch.countDown()时得知全部准备完毕,则表示发令起跑,此时两位选手开始起跑, 谁先跑完则是由各自能力决定
小结: 赛跑用法常用于软件压测, 同时开启若干测试线程,并进入等待状态,当所有线程准备完毕,同时执行业务,达到高并发测试
2.同样是长跑比赛,既然有比赛自然也有名次,那么只有在最后一名选手跑完才能宣布比赛结束
public static void main(String[] args) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(2); new Thread(()->{ try { System.out.println("努力奔跑"+Thread.currentThread().getName()); //跑完全程要花2秒 Thread.sleep(2000L); System.out.println("奔跑结束 到达终点"+Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); new Thread(()->{ try { System.out.println("努力奔跑"+Thread.currentThread().getName()); //跑完全程要花5秒 Thread.sleep(5000L); System.out.println("奔跑结束 到达终点"+Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2").start(); //0.1秒睡眠只是防止打印串位 无实际作用 Thread.sleep(100); System.out.println("等待最后一名选手跑完"); latch.await(); System.out.println("比赛结束所有人都已跑完"); } 运行结果: 努力奔跑t1 努力奔跑t2 等待最后一名选手跑完 奔跑结束 到达终点t1 奔跑结束 到达终点t2 比赛结束所有人都已跑完
latch.await()会阻塞主线程, 直到最后一个人跑完,才宣布比赛结束
小结:此示例可用于多个线程同时执行, 但是在最后一个线程执行完成后报告结果, 避免轮询查询是否所有线程都已执行完成, 当然另外也有很多种实现方式,比如CompletableFuture就常用来解决此类问题,不在此文过多解释
方法源码分析:
1.构造函数CountDownLatch(int count), 对AQS有了解的人很容易看懂下面代码其实就是对state的设置
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } //AQS 设置state的值为count Sync(int count) { setState(count); } protected final void setState(int newState) { state = newState; }
2.阻塞方法 latch.await()
利用AQS共享锁的原理, 如果AQS state属性非0, 则将当前线程节点放入AQS等待队列, 并将等待模式设置为共享模式(具备先传播在解锁功能达到共享), 然后阻塞当前执行线程
类 CountDownLatch public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 类 CountDownLatch.Sync public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //尝试获取锁 根据下面一段代码可知state初始为count //若state未改变的情况下(countDown()会讲如何改变的)必然要执行doAcquireSharedInterruptibly(arg) if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //尝试获取同步锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //获取同步锁 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //新增等待节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //设置头部节点并且节点等待模式为共享模式(共享模式概念可以在AQS文章中找到) setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
3.latch.countDown() 计数方法
利用AQS共享锁释放的原理, 当计数state=0的时候, 将等待队列中的线程以链式调用(传播)方法唤醒
类 CountDownLatch public void countDown() { sync.releaseShared(1); } 类 CountDownLatch.Sync //释放共享锁(不一定释放成功 只是尝试) public final boolean releaseShared(int arg) { //尝试释放共享锁 当计数等于0 tryReleaseShared(arg)等于true if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //尝试释放共享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; //计数减一 int nextc = c-1; //原子操作 state=nextc 然后返回原子操作后nextc也即state是否等于0 if (compareAndSetState(c, nextc)) return nextc == 0; } } //释放共享锁 这一段就不详细讲了 做的事情就是共享锁的释放 //如果当前节点的waitStatus=-3则 继续传播给下一个节点 //遇到非waitStatus=-3跳过 继续传播给下一个节点, 那么就达到唤醒所有节点上的线程 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
总结:
CountDownLatch底层用的是AQS原理, 如果掌握了AQS的原理CountDownLatch是很容易理解的