CyclicBarrier使用详解及源码解读

概述

当一组线程到达一个同步点(wait方法调用出)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被拦截的线程才会继续运行。
值得注意的是同步点有多个,当线程到达各自的同步点先会被阻塞,当都到达同步点,就会在各自的同步点处往下执行。
构造方法可以设置在所有线程都到达同步点之前执行另一个线程,wait()方法可以设置等待时间。

实例

package cyclicbarrier;

import java.util.concurrent.CyclicBarrier;

/**
 * 字面意思是可循环使用的屏障
 * 让一组线程到达时被阻塞,直到最后一个线程到达时才开门(简单的说就是人齐了就开门)
 * 每个线程调用await方法告诉CyclicBarrier我已到达并阻塞
 * 适用于多线程计算数据,最后合并计算结果的场景
 * 提供一个更加高级的构造函数用于处理复杂的业务场景
 * CyclicBarrier(int parties,Runnable barrierAction)
 * 在线程到达屏障时,优先执行barrierAction,便于执行更复杂的业务场景
 * 
 * countDownLatch调用的就一个await(),其他线程都在这个await处等待
 * cyclicbarrier各自线程都有一个await(),等待所有线程都达到await时,各自向下执行
 * @author bamboo
 *
 */
public class TestCyclicBarrier implements Runnable{
	
	public static void main(String[] args) {
		CyclicBarrier barrier = new CyclicBarrier(2,new TestCyclicBarrier());
		for(int i = 0;i < 2;i++) {     // 如果是3的话,barrier.await后的部分永远不会执行
			new ThreadA(barrier).start();
		}

		
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + ": 本线程始终优先于同步点后的代码执行");
	}

}

package cyclicbarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ThreadA extends Thread{

	CyclicBarrier barrier;
	
	ThreadA(CyclicBarrier barrier){
		this.barrier = barrier;
	}
	
	public void run() {
		try {
			Thread.sleep(3000);
			System.out.println(Thread.currentThread().getName() + ": 我到了,等他们到齐");
			barrier.await();
			System.out.println(Thread.currentThread().getName() + ": 到齐了,走了走了");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

结果:

Thread-0: 我到了,等他们到齐
Thread-1: 我到了,等他们到齐
Thread-1: 本线程始终优先于同步点后的代码执行
Thread-1: 到齐了,走了走了
Thread-0: 到齐了,走了走了

源码

先来看看内部,结构还是很简单的,就一个静态内部类,还有一些其他的参数。
主要还是关注构造方法,一个传整形,另一个额外需要runnable实现类,这个可以在CountDownLatch设置的同步点通过之前先执行。
await方法可以在指定数量线程都抵达之前都阻塞,或者设置一个等待时间,达到一定时间停止阻塞,继续向下执行。
reset
CyclicBarrier使用详解及源码解读
构造函数

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

wait

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

reset
打破当前已存在的屏障,使之前的同步点失效。

    /**
     * Resets the barrier to its initial state.  If any parties are
     * currently waiting at the barrier, they will return with a
     * {@link BrokenBarrierException}. Note that resets <em>after</em>
     * a breakage has occurred for other reasons can be complicated to
     * carry out; threads need to re-synchronize in some other way,
     * and choose one to perform the reset.  It may be preferable to
     * instead create a new barrier for subsequent use.
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

使当前屏障这一系列线程停止并使他们清醒起来,仅当他们持有锁时生效。

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }
上一篇:Threading Barrier 代码笔记


下一篇:CyclicBarrier