JUC并发工具包之CyclicBarrier

1、简介

CyclicBarrier是一个同步器,允许多个线程等待彼此直到达一个执行点(barrier)。

CyclicBarrier都是在多个线程必须等到彼此都到达同一个执行点后才执行一段逻辑时才被使用。

barrier被叫做cyclic是因为阻塞线程恢复后可以重复使用barrier

2、使用

CyclicBarrier的构造器很简单,传入一个整形表示线程数再调用barrier实例的await()方法表示所有线程要到达这一共同的执行点。

public CyclicBarrier(int parties)

所有需要同步执行的线程也被叫做parties,调用await()方法可以注册一个线程到达过执行点(barrier point)。

这个调用是同步的并且当前线程会挂起直至其他线程都调用到barrier的await()方法,该场景中既定数量的线程都调用了await()方法叫做tripping the barrier(源码中这种描述出现多次)

或者,我们也可以传第二个参数给构造方法,这是一个Runnable的实例,这将作为最后一个线程trips the barrier

public CyclicBarrier(int parties, Runnable barrierAction)

3、实现

为了实际使用一下CyclicBarrier,我们来考虑下面一个场景:

固定数量的线程执行一段操作并把对应的结果存到列表中,当所有的线程执行完操作,其中一个(也就是最后一个trips the barrier的线程)开始处理其他线程获取到的结果。

我们开始实现一下这个类:

public class CyclicBarrierDemo {
 
    private CyclicBarrier cyclicBarrier;
    private List<List<Integer>> partialResults
     = Collections.synchronizedList(new ArrayList<>());
    private Random random = new Random();
    private int NUM_PARTIAL_RESULTS;
    private int NUM_WORKERS;
 
    // ...
}

这个类已经很直接了当了,NUM_WORKERS是线程数,NUM_PARTIAL_RESULTS是每个线程即将要输出结果的数量

最后,我们定义一个partialResults来存储所有线程存储的结果,请注意这个列表是一个SynchronizedList,因为多个线程会同时向它写入数据(普通ArrayList的add方法不是线程安全的)

现在我们实现一下每个工作线程的逻辑:

public class CyclicBarrierDemo {
 
    // ...
 
    class NumberCruncherThread implements Runnable {
 
        @Override
        public void run() {
            String thisThreadName = Thread.currentThread().getName();
            List<Integer> partialResult = new ArrayList<>();
 
            // Crunch some numbers and store the partial result
            for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {    
                Integer num = random.nextInt(10);
                System.out.println(thisThreadName
                  + ": Crunching some numbers! Final result - " + num);
                partialResult.add(num);
            }
 
            partialResults.add(partialResult);
            try {
                System.out.println(thisThreadName 
                  + " waiting for others to reach barrier.");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                // ...
            } catch (BrokenBarrierException e) {
                // ...
            }
        }
    }
}

我们现在实现所有现在都到执行点后的逻辑,为了把逻辑写简单点,我们就把partialResults列表中的值全部相加。

public class CyclicBarrierDemo {
 
    // ...
     
    class AggregatorThread implements Runnable {
 
        @Override
        public void run() {
 
            String thisThreadName = Thread.currentThread().getName();
 
            System.out.println(
              thisThreadName + ": Computing sum of " + NUM_WORKERS 
              + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
            int sum = 0;
 
            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult+" ");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(thisThreadName + ": Final result = " + sum);
        }
    }
}

那现在最后一步就是构建CyclicBarrier并把所有的逻辑在main()方法中跑通:

public class CyclicBarrierDemo {
 
    // Previous code
  
    public void runSimulation(int numWorkers, int numberOfPartialResults) {
        NUM_PARTIAL_RESULTS = numberOfPartialResults;
        NUM_WORKERS = numWorkers;
 
        cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
 
        System.out.println("Spawning " + NUM_WORKERS
          + " worker threads to compute "
          + NUM_PARTIAL_RESULTS + " partial results each");
  
        for (int i = 0; i < NUM_WORKERS; i++) {
            Thread worker = new Thread(new NumberCruncherThread());
            worker.setName("Thread " + i);
            worker.start();
        }
    }
 
    public static void main(String[] args) {
        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.runSimulation(5, 3);
    }
}

以上代码中,我们在CyclicBarrier中初始化了5个线程,每个线程会输出3个整形并存储到partialResults中。

4、输出结果

以下就是上述程序运行一次的输出结果,每次执行可能都会有不同的结果因为线程会以不同的顺序执行。

Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2 
Adding 2 0 5 
Adding 6 4 0 
Adding 1 1 0 
Adding 9 3 5 
Thread 4: Final result = 46

5、总结

  • 能看到CyclicBarrier的适用场景
  • 实现了一段逻辑:固定线程都到达一个执行点后再执行其他逻辑

最后这些例子的实现都可以在Github上找到

原文地址:https://www.baeldung.com/java-cyclic-barrier

6、思考

前文说过

barrier被叫做cyclic是因为阻塞线程恢复后可以重复使用barrier

那这里的重复使用怎么理解呢?

最后可以看下王者荣耀的例子,十分帮忙理解CyclicBarrier。

上一篇:数组中嵌套对象,根据对象某个属性对数组进行去重


下一篇:js数组方法forEach,map,filter,every,some实现