CountDownLatch
countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。 是通过一个计数器来实现的,计数器的初始值是线程的数量。
CountDownLatch的用法
CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
CountDownLatch典型用法:2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
CountDownLatch的不足
CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
public class TestCountDownLatch {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(2);
executorService.submit(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
System.out.println("第一个");}
);
executorService.submit(()->{
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
System.out.println("第二个");}
);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("已经完成了");
}
}
CyclicBarrier
一个可循环利用的屏障。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
构造方法
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
解析:
parties 是参与线程的个数
第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
重要方法
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
解析:
线程调用 await() 表示自己已经到达栅栏
BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public class TestCyclicBarrier {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("等待 3");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("broken");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("等待 2");
cyclicBarrier.await();
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("等待 3");
cyclicBarrier.await();
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
}
Semaphore
信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
方法说明
acquire()
获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
acquire(int permits)
获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
acquireUninterruptibly()
获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
tryAcquire()
尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
tryAcquire(long timeout, TimeUnit unit)
尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
release()
释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
hasQueuedThreads()
等待队列里是否还存在等待线程。
getQueueLength()
获取等待队列里阻塞的线程数。
drainPermits()
清空令牌把可用令牌数置为0,返回清空令牌的数量。
availablePermits()
返回可用的令牌数量。
public class TestSemaphore {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Condition
用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。
Condition是个接口,基本的方法就是await()和signal()方法;
Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
就是可以分组的同步控制
CountDownLatch、CyclicBarrier、Semaphore分析
区别与作用分析
CountdownLatch利用继承AQS的共享锁来进行线程的通知,利用CAS来进行,作用域是一次操作,一个线程可以多次,控制更加灵活。
CyclicBarrier则利用ReentrantLock的Condition和AQS来阻塞和通知线程,可以重复使用,作用域是线程,一个线程只能生效一次await()
Semaphore是控制并发量的,如同一个管道口一样,只有固定的直径,控制通过直径的物体切面。避免挤爆管道口。比如:一台机器有10个资源,A功能需要2个资源,资源只能5个线程访问A功能;不止可以可以横向控制资源,也能纵向控制,A->B ,B功能1个资源,到B资源时信号量就应该改成1,整体维持到10。有公平和非公平两种
原理分析
都是通过 内部类实现AQS抽象方法,来实现其线程控制的功能。
部分源码:
其中CyclicBarrier的
AQS 三要素
1、 state 状态
2、线程队列
3、获取和释放的方法
AQS底层代码逻辑复杂,有很细节的处理逻辑,通过提高的三要素也可以实现一个多线程控制;
自定义门栓:
类型join一样的门栓,state等于1时门栓开启,可以执行线程,state等于o是等待线程,实现n线程等待1线程
public class MyLatch {
public MyLatch() {
sync = new Sync(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
@Override
protected int tryAcquireShared(int arg) {
if(getState() == 1){
return -1;
}else{
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
setState(0);
return true;
}
}
private final Sync sync;
/**
* @Param []
* @return void
* @Description 等待
* @author alex
* @date 2021/9/27 20:09
**/
public void await() {
sync.acquireShared(0);
}
/**
* @Param []
* @return void
* @Description 释放
* @author alex
* @date 2021/9/27 20:09
**/
public void signal(){
sync.releaseShared(1);
}
}
使用自定义门栓
public class MyLatchTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
MyLatch myLatch = new MyLatch();
for(int i=0;i<10;i++){
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("准备就绪!");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
myLatch.await();
System.out.println("i run");
}
});
}
Thread.sleep(1000);
System.out.println("全部跑起来!");
myLatch.signal();
for(int i=0;i<10;i++){
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("准备就绪!");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
myLatch.await();
System.out.println("i run");
}
});
}
}
}