CyclicBarrier(同步屏障)
应用场景:一个等多个,当一组线程到达一个屏障,调用awaite,告诉CyclicBarrier我已经到了,然后当前线程会被阻塞,当最后一个线程到达时就会开始执行。下面demo,模拟三个人到达5道门的情景,每次都必须所有人都到一道门才能去下一道门。
使用demo
package com.w.juc;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo implements Runnable{
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
@Override
public void run() {
try {
Thread.sleep((long)Math.random()*10000);
System.out.println(Thread.currentThread()+"我到了,我在等待其它人");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
new Thread(new CyclicBarrierDemo(),"一号").start();
new Thread(new CyclicBarrierDemo(),"二号").start();
new Thread(new CyclicBarrierDemo(),"三号").start();
Thread.sleep(2000);
System.out.println("所有人都到了第"+i+"道门");
}
new Thread(new CyclicBarrierDemo(),"四号").start();
new Thread(new CyclicBarrierDemo(),"五号").start();
Thread.sleep(2000);
System.out.println("缺一个人");
}
}
除了隐式的重置计数器,还可以在出现问题后调用reset()方法重置。
结构
-
构造参数
public CyclicBarrier(int parties) { this(parties, null); } //当最后一个线程到达屏障,优先执行,barrierAction public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
-
成员变量
//因为用到的是条件队列,所以需要lock private final ReentrantLock lock = new ReentrantLock(); //通过lock获取条件队列,除最后一个到达的线程,其余线程都会阻塞到这个队列 private final Condition trip = lock.newCondition(); //需要等待的线程数量,通过构造参数初始化 private final int parties; // 当所有线程到达时要执行的任务(构造时可选,即可为null) private final Runnable barrierCommand; //Generation 实例,在此处进行初始化,表示代的概念 private Generation generation = new Generation(); //当前代还有多少个线程未到位 private int count;
方法
-
awaite() 必用方法,等待,直到所有各方都已调用此屏障。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //主要方法 //如果任何线程在等待时被中断,那么所有其他正在等待的线程将抛出BrokenBarrierException,并将barrier置于中断状态 //其他线程调用该屏障上的重置,如果任何一方正在等待屏障,他们将返回一个BrokenBarrierException private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//使用condition的线程必须获取到锁 try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { //线程被中断,唤醒trip条件队列中所有等待的线程,并将broken置为true, //其它调用awaite的线程会抛出BrokenBarrierException() breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped //index为0,代表当前代最后一个线程已经到达 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null)//如果构造方法时指定了最后一个线程到达时要执行的任务,在这里调用 command.run(); ranAction = true; //这里新new一个代,重置count,唤醒等待队列中的线程 nextGeneration(); return 0;//已经是最后一个线程了就直接返回0 } finally { if (!ranAction) //打破当前一代,唤醒其它线程 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //循环直到触发、中断、中断或超时,只有非最后一个线程会执行到这里 for (;;) { try { //没有设置超时,直接就阻塞到等待队列 if (!timed) trip.await();//调用awaite会释放锁 else if (nanos > 0L) //阻塞到超时等待队列 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } //这下面的方法,都只有当前线程被唤醒后才会执行 // 如果任何线程在等待时被中断,那么所有其他正在等待的线程将抛出BrokenBarrierException,并将barrier置于中断状态 if (g.broken) throw new BrokenBarrierException(); //一旦!=成立,代表最后一个线程到达,唤醒了其它所有线程。正常情况下 //有线程唤醒此线程,代表着肯定执行了nextGeneration()方法,也就新new了一代。 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
与countDownLatch比较
-
相同点:
都是控制线程协做的,一个构造函数传入预期值,当有多少个线程调用时,统计然后到达一个点,让所有线程正常执行。
-
不同点:
countDownLatch,只有一个构造函数。而且需要手动调用countDown去更改计数器值,直到减为0,依次让所有在同步队列的线程执行,并不是等待(条件)队列。
CyclicBarrier则是自动的减,只需要线程调用awaite()。当一组线程的最后一个线程到达一个代时,唤醒阻塞在等待队列中的所有线程。而且计数器的值可以被重置使用,可以显示的调用reset或者当到达一代之后直接调用awaite进入新的一代。另外,是有提供两个构造方法,其中一个支持当最后一个线程到达屏障时要制作的任务。
countDownLatch底层实现的是AQS,使用的是共享锁模式,操作的是AQS的state
CyclicBarrier则是使用的是ReetrantLock的Condition,使用的是互斥锁模式。