CyclicBarrier源码详解

CyclicBarrier简介

CyclicBarrier也是一种线程同步工具,用于多个线程之间的同步,也是适用于一个线程等待多个线程。和CountDownLatch相比,CyclicBarrier有多个改进:
1、CyclicBarrier可以循环利用
2、CyclicBarrier中的线程的同步更加严谨。CountDownLatch中的线程在countDown()后就会执行代码,而CyclicBarrier中的线程会一直阻塞,直到被同步的方法调用,所以说CyclicBarrier中的线程的同步程度更高。

CyclicBarrier示例

package juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class TestCyclicBarrier {

    public static void main(String[] args) {

        final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("线程集合集合完毕");
            }
        });

        new Thread(){
            @Override
            public void run() {

                while (true){

                    try {

                        System.out.println("线程1开始执行");
                        Thread.sleep(5000);
                        System.out.println("线程1执行完毕");
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }

            }
        }.start();

        new Thread(){
            @Override
            public void run() {

                while (true){

                    try {

                        System.out.println("线程2开始执行");
                        Thread.sleep(5000);
                        System.out.println("线程2执行完毕");
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }

            }
        }.start();

    }
}

CyclicBarrier源码详解

如上述例子所述,CyclicBarrier中分为两种线程:
1、同步执行的n个线程
2、等待n个线程完成后才开始执行的线程,我们叫他最后线程吧

CyclicBarrier源码详解

CyclicBarrier源码详解
从上述结构图看,CyclicBarrier主要是利用ReentrantLock来保证同步的。

1、初始化

final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
          @Override
          public void run() {
              System.out.println("线程集合集合完毕");
          }
      });

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    //对应的线程个数
    this.parties = parties;
    //当前未阻塞的线程个数
    this.count = parties;
    //执行的线程
    this.barrierCommand = barrierAction;
}

private static class Generation {
    boolean broken = false;
}

//独占锁
private final ReentrantLock lock = new ReentrantLock();
//因为CyclicBarrier是重复可利用的,一轮代表一代
private Generation generation = new Generation();
//当调用await后,会被阻塞。
private final Condition trip = lock.newCondition();

初始化代码调用的构造方法需要传入一个count和一个Runnable对象。
count对应着线程数量,Runnable对应着最后待同步的线程!

await()

在阅读原码前,朋友们先明白CyclicBarrier的一个机制:CyclicBarrier是可循环利用的,每个循环对应着一个年代,对应着CyclicBarrier就内置了一个Generation类,用来描述当前年代的状态。

private static class Generation {
    //n+1个线程中只要有一个被中断或者出现超时,就将broken设置为true
    boolean broken = false;
}

带着这个概念去阅读源码,思路就十分清晰了

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        
        final Generation g = generation;

        //有线程超时了,或者是被中断了
        if (g.broken)
            throw new BrokenBarrierException();

        //如果线程被中断了,将所有线程唤醒,可中断
        if (Thread.interrupted()) {
            //唤醒所有等待的线程,重置count,并将g.broken=true
            breakBarrier();
            //抛出异常
            throw new InterruptedException();
        }

        //await后调用后,count--
        int index = --count;
        if (index == 0) {  //count==0,说明所有线程都调用await()了,可以唤醒那n-1个线程了。
            //是否执行最后线程了
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    //开始执行
                    command.run();
                //执行最后线程了
                ranAction = true;
                //释放所有等待在trip中condition queue中的线程
                nextGeneration();
                return 0;
            } finally {
                //如果执行ranAction失败,说明出现异常
                if (!ranAction)
                    //上述代码出现异常,执行breakBarrier
                    breakBarrier();
            }
        }


        //运行到这里的代码,都是n个线程中的n-1个线程
        for (;;) {
            try {
                //阻塞式调用
                if (!timed)
                    //阻塞将线程挂起到condition queue
                    trip.await();
                //调用的是await(timeout),设置的有超时时间    
                else if (nanos > 0L)
                   //最多阻塞nanos时间将线程挂起到condition queue
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                //在等待的过程中,如果线程被中断,如果其他线程还未调用breakBarrier,那么当前线程就调用
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    //如果其他线程已经breakBarrier,就直接自我中断就ok了
                    Thread.currentThread().interrupt();
                }
            }

            //如果正常流程被打断,抛出BrokenBarrierException
            if (g.broken)
                throw new BrokenBarrierException();

            //如果正常的更新换代的话,返回index
            if (g != generation)
                return index;

            //如果await超时的话,也会breakBarrier,抛出TimeoutException
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

private void breakBarrier() {
    //n+1个线程当中有线程被中断,或者await请求超时了,就将当前这个代打断,唤醒所有线程,各个线程自己单独处理这个异常
    
    //设置broken为true
    generation.broken = true;
    //重置count
    count = parties;
    // 唤醒等待在trip的condition queue中的全部线程
    trip.signalAll();
}

private void nextGeneration() {
	//当前这轮线程全部执行成功,重置年代,开始下一轮同步
    // 唤醒等待在trip的condition queue中的全部线程
    trip.signalAll();
    // count 重置为 parties
    count = parties;
    //generation也重新初始化
    generation = new Generation();
}

总结

1、CyclicBarrier的底层是通过一个ReentrantLock、一个int变量count、一个Condition、一个Generation来实现的。

流程:
1、首先初始化一个ReentranLock对象、将count变量赋值给parties,初始化一个Condition、初始化一个Generation,Generation中只有一个broken布尔两对应着当前的同步状态是否被破坏,将Runnable对象保存给barrierCommand。

2、当线程调用await()的时候。会先抢夺lock锁,然后将count--。如果count == 0,说明线程都已经执行完毕,可以开始执行barrierCommand了。并且通过SignalAll()来唤醒所有等待的线程,并且将Generation重新初始化,将count重新赋值为parties,开始下一轮的同步。

3、如果count!=0,说明还有其他线程未完成,就调用condition.await()方法,将线程挂起。

Tip

Generation.broken
当有线程被中断或者线程被挂起超时的时候,将generation的broken设置为true,代表同步被破坏了,会立即唤醒所有之前挂起的线程。
新来的线程在操作count--之前,会判断broken,如果broken=true,就会直接抛出异常!

上一篇:C. Yet Another Broken Keyboard--------思维


下一篇:Python提示[Errno 32]Broken pipe导致线程crash错误解决方法