所有的线程必须同时到达栅栏位置,才能继续执行。栅栏用于等待其他线程。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier的另一个构造函数CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
以上内容参考: https://blog.csdn.net/qq_38293564/article/details/80558157
demo:
public class CycleBarrierTest { public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { System.out.println("创建工作线程" + i); Thread t = new Thread(new Worker(cb)); t.start(); } } static class Worker implements Runnable { private CyclicBarrier cb; public Worker(CyclicBarrier cb){ this.cb = cb; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "开始等待其他线程..."); cb.await(); System.out.println(Thread.currentThread().getName() + "执行完成..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
源码分析:
CyclicBarrier的await()方法
/** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { // 当前代 final Generation g = generation; // 如果这个代损坏了,就抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果线程中断,则唤醒所有线程, 重置代和count if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 当index ==0 的时候,则为最后一个线程调用该方法。 if (index == 0) { // tripped // 损坏状态标识符 boolean ranAction = false; try { final Runnable command = barrierCommand; // 执行构造函数为CyclicBarrier(int parties, Runnable barrierAction)的barrierAction的run方法 if (command != null) command.run(); ranAction = true; // 唤醒所有线程, 重置代和count nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 如果没有时间限制,就等待,直到被唤醒 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 如果代损坏了抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果不是当前代,就返回下标 // 因为一个线程可以使用多个栅栏,所以需要代来区分。 if (g != generation) return index; // 如果有时间限制且小于0,则唤醒所有线程, 重置代和count并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier之后,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。
CountDownLatch一般用于某个线程A等待若干其他线程完成后,它才执行; 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后一组线程再同时执行。
CountDownLatch强调一个线程等多个线程完成某件事情;CyclicBarrier是多个线程互等,等大家完成,再携手共进。