CountDownLatch 基本工作原理和使用案例

定义:一种多功能的同步工具,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。字面可以翻译为“倒计时锁存器”。

功能:使用给定的计数初始化CountDownLatch。由于countCount方法的调用,直到当前计数达到零为止,await方法将阻塞,此后所有释放的线程将被释放,并且任何随后的await调用将立即返回。这是一种一次性现象-无法重置计数。如果需要用于重置计数的版本,请考虑使用CyclicBarrier。
CountDownLatch是一种多功能的同步工具。

  1. CountDownLatch初始化为一个简单的开/关锁存器或门:所有调用线程等待在门处等待,直到被调用countDown的线程打开为止。
  2. 初始化为N的CountDownLatch可用于使一个线程等待,直到N个线程完成某个动作或某个动作已完成N次。

 

核心方法:

public void countDown();

减少锁存器的计数,如果计数达到零,则释放所有等待线程。
如果当前计数大于零,则将其递减。 如果新计数为零,则将重新启用所有等待线程以进行线程调度。
如果当前计数等于零,那么什么也不会发生。

public void await() throws InterruptedException;

导致当前线程等待,直到锁存器递减计数到零为止,除非该线程被中断。
如果当前计数为零,则此方法立即返回。
如果当前计数大于零,则出于线程调度目的,当前线程将被禁用,并且在发生以下两种情况之一之前,它处于休眠状态:

  • 由于countDown方法的调用,计数达到零。 
  • 或者,其他一些线程中断当前线程。

如果当前线程:

  • 在进入此方法时已设置其中断状态;
  • 或者,在等待期间被打断,

那么,将抛出 InterruptedException 并清除当前线程的中断状态。

抛出:
InterruptedException-如果当前线程在等待时被中断

应用场景1:把一个任务分解为N个子任务去处理。将问题分为N个部分,用Runnable描述每个部分,该Runnable执行该部分并在锁存器上递减计数,然后将所有Runnable排队给执行程序。 当所有子部分都完成时,协调线程将能够通过等待。 (当线程必须以此方式反复递减计数时,请使用CyclicBarrier。)

官方示例代码:

class Driver2 { // ...
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     Executor e = ...

     for (int i = 0; i < N; ++i) // create and start threads
       e.execute(new WorkerRunnable(doneSignal, i));

     doneSignal.await();           // wait for all to finish
   }
 }

 class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;
   WorkerRunnable(CountDownLatch doneSignal, int i) {
     this.doneSignal = doneSignal;
     this.i = i;
   }
   public void run() {
     try {
       doWork(i);
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

应用场景2:这是一对类,其中一组工作线程使用两个倒数锁存器:

  • 第一个是启动信号,可防止任何工人继续前进,直到驾驶员为他们做好准备为止。
  • 第二个是完成信号,驾驶员可以等待所有工人完成操作。

官方示例代码:

 class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

 

下面谈谈场景1的案例代码实现思路:

  • 先初始化一个共享的CountDownLatch,计数器值初始化为N;
  • 在启动所有的工作线程之后,调用await()方法等待;
  • 当每个线程完成一个任务的时候,在任务线程内部调用countDown()方法将计数器的值N减1;
  • 当计数器N被减至0时,表示所有的子线程的任务已经完成,不再继续等待。

示例代码:

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

    private static class Worker implements Runnable {
        private CountDownLatch latch;
        List<String> classIds;
        List<String> result;

        Worker(CountDownLatch latch, List<String> classIds, List<String> result) {
            this.latch = latch;
            this.classIds = classIds;
            this.result = result;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " execute task. ");
                this.result.addAll(classIds);
                System.out.println(Thread.currentThread().getName() + " finished task. ");
            } finally {
                latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        List<String> ids = getTestList();

        CountDownLatch latch = new CountDownLatch(5);
        List<String> result = Collections.synchronizedList(new ArrayList<>(100));

        for (int i = 0; i < 5; i++) {
            Worker worker = new Worker(latch, ids, result);
            Thread thread = new Thread(worker, "Worker-Thread-" + i);
            thread.start();
        }

        System.out.println("Main-Thread await. ");
        latch.await();
        System.out.println("Main-Thread finishes await. ");
        System.out.println(result);
    }

    private static List<String> getTestList() {
        int SIZE = 20;
        List<String> ids = Lists.newArrayListWithCapacity(SIZE);
        for (int i = 0; i < SIZE; i++) {
            ids.add(i + "");
        }
        return ids;
    }

}

 

CountDownLatch 基本工作原理和使用案例CountDownLatch 基本工作原理和使用案例 小方编程 发布了8 篇原创文章 · 获赞 3 · 访问量 2391 私信 关注
上一篇:Java代码模拟并发测试工具——用代码的方式模拟并发


下一篇:sqlAchemy分页实现