JUC 一 CyclicBarrier 与 Semaphore

java.util.concurrent

CyclicBarrier简介

CyclicBarrier:可重用屏障/栅栏

  1. 类似于 CountDownLatch(倒计数闭锁),它能阻塞一组线程直到某个事件的发生。
  2. 与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
  3. CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset() 方法重置
  4. CountDownLatch 采用减计数方式,CyclicBarrier 采用加计数方式

CyclicBarrier源码

构造函数:

    //线程到达屏障时,优先执行 barrierAction
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }


await(),await(long timeout, TimeUnit unit) :
    方法的线程告诉 CyclicBarrier 自己已经到达屏障,然后当前线程被阻塞,直到:

        1,最后一个线程到达
        2,其他线程中断了当前线程.
        3,其它线程中断了其它等待的线程.
        4,在barrier上面等待的线程发生超时.
        5,其它线程调用了barrier上面的reset方法.

reset():
    将barrier状态重置。如果此时有线程在barrier处等待,它们会抛出BrokenBarrierException并返回。

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

CyclicBarrier示例

public class CyclicBarrierTest {

    // 自定义工作线程
    private static class Worker extends Thread {
        private CyclicBarrier cyclicBarrier;
        
        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        
        @Override
        public void run() {
            try {
                //线程创建后等待,直到有三个线程才执行后续操作
                cyclicBarrier.await();

                // 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
                System.out.println(Thread.currentThread().getName() + "执行任务!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        //三个线程出现才执行
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        
        for (int i = 0; i < 3; i++) {
            Worker worker = new Worker(cyclicBarrier);
            worker.start();
        }
    }
}

Semaphore简介

限制可以访问某些资源的线程数量,例如通过 Semaphore 限流。

主要方法:

Semaphore(int permits):
    构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。

Semaphore(int permits,boolean fair):
    构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

void acquire():
    从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位。

void acquire(int n):
    从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。

void release():
    释放一个许可,将其返回给信号量。就如同车开走返回一个车位。

void release(int n):
    释放n个许可。

int availablePermits():
    当前可用的许可数

Semaphore示例(可以实现单例模式)

public class SemaphoreDemo {
    private static final Semaphore semaphore=new Semaphore(3);
    private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
    
    private static class ThreadDemo extends Thread {

        public void run() {
            try {
                //占位(如果许可数达到最大活动数,那么调用acquire()之后,便进入等待队列,等待已获得许可的线程释放许可)
                semaphore.acquire();
                //执行任务
                Thread.sleep(1000);
                //归还
                semaphore.release();
            }
            catch(InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for(int i=0;i<7;i++) {
            Thread t1=new ThreadDemo();
            threadPool.execute(t1);
        }
    }
}
上一篇:Symantec NetBackup 8.1安装部署


下一篇:c – sem_wait和信号处理程序