文章目录
前言
从这一篇博客开始,开始总结线程间协作,并发流程控制的工具类,这一篇主要介绍Semaphore和Condition
Semaphore
Semaphore中文译文为信号量,操作系统中也有同样的概念。类似于生活中常见的许可证的概念。在执行指定业务逻辑之前,需要先获取相关的许可证。在限流的使用场景中也有Semaphore的影子。
在Java中,信号量的作用是维护一个"许可证"的计数,线程可以获取许可证,获取成功信号量的个数就减一,同时线程也可以释放一个许可证,释放一个信号量就加一,当信号量所拥有的许可证数量为0,那么其他想要获取许可证的线程就需要等待,直到有另外的线程释放了许可证。
在初始化Semaphore的时候,需要初始化信号量个数,同时也可以设置对应的公平策略,如果指定为公平,那么Semaphore会把之前的等待线程放入到自己维护的一个FIFO队列中,信号量的分发会根据在FIFO队列中等待的时长来进行。
通过Semaphore获取信号量的方法有多个,大体上分为以下三类
方法 | 作用 |
---|---|
acquire() acquire(int permits) |
获取信号量的时候阻塞,同时响应中断。这里的permits参数指的是想要获取信号量的个数 |
acquireUninterruptibly() acquireUninterruptibly(int permits) |
在获取信号量的时候不响应中断 |
tryAcquire() tryAcquire(int permits) tryAcquire(int permits, long timeout, TimeUnit unit) tryAcquire(long timeout, TimeUnit unit) |
尝试获取信号量,如果没有则不阻塞,可以指定获取信号量的时间 |
如果执行完逻辑,可以调用release来释放信号量
简单实例
/**
* autor:liman
* createtime:2021/11/28
* comment:多个线程等待许可证
*/
public class SemaphoreDemo01 {
static Semaphore semaphore = new Semaphore(3, true);//3个许可证,公平模式
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
executorService.submit(new Task());
}
//判断线程池是否停止
executorService.shutdown();
while(!executorService.isTerminated()){
//线程池没有执行完成任务,主线程就空转,直到线程池运行结束
}
}
static class Task implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
//获取多个信号量
//semaphore.acquire(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到了许可证");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
//释放多个信号量
//semaphore.release(3);
System.out.println(Thread.currentThread().getName() + "释放了许可证");
}
}
}
信号量的获取个数可以灵活指定,这在一定程度上可以根据线程耗费资源的程度进行侧重,比如有两个任务,一个是TaskA ,一个是TaskB,TaskA的执行需要耗费较多的资源,TaskB需要耗费较少的资源,我们可以定义3个信号量。执行TaskA需要获取全部信号量才能执行,而执行TaskB只需要获取一个信号量,这样TaskA和TaskB就永远不可能同时执行。
信号量需要注意的是,在获取和释放的时候,数量必须要保证一致,每次获取多少信号量就要释放多少信号量,如果某个线程获取了2个信号量,但是释放的时候,只释放了1个,随着时间的推移,总会有一个时间点,其他线程的信号量不够用了,这样会导致程序卡死。
信号量的获取和释放,可以不在同一个线程中执行,也许是线程A获取了3个信号量,线程B释放了3个信号量,这也是可行的,但要保证逻辑合理。
Condition
Condition是一个接口,其通过ReentrantLock创建,其作用非常类似于Object中的wait和notify的组合,在Conditon中变成了await和signal。关于wait和notify这两者可以参考之前的博客Thread和Object中的方法。
简单基本实例
/**
* autor:liman
* createtime:2021/11/28
* comment:普通的condition用法实例
*/
@Slf4j
public class ConditionDemo {
private ReentrantLock lock = new ReentrantLock();
//condition由锁来实例化
private Condition condition = lock.newCondition();
public void method01(){
lock.lock();
try{
System.out.println("条件不满足,进入await");
//这里
condition.await();
System.out.println("条件满足,开始继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method02(){
lock.lock();
try{
System.out.println("准备工作完成,唤醒其他线程");
condition.signal();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ConditionDemo conditionDemo = new ConditionDemo();
new Thread(()->{
try {
Thread.sleep(1000);
conditionDemo.method02();//唤醒在condition上等待的线程
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//主线程调用method01先行阻塞,在1秒之后,被上面新建的线程唤醒
conditionDemo.method01();
}
}
与notify相对应的,Condition也存在signal和signalAll两个方法,其中signalAll会唤醒Condition上等待的所有线程,而signal是公平的,只会唤醒等待时间最长的线程。
利用Condition实现的生产者消费者模式
/**
* autor:liman
* createtime:2021/11/28
* comment:condition实现生产者和消费者
*/
@Slf4j
public class ConditionProducerAndCustomer {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
class Customer extends Thread{
@Override
public void run() {
consume();
}
private void consume(){
while(true){//消费者一直消费
lock.lock();
try{
while(queue.size() == 0){
System.out.println("队列为空,消费者进入等待");
try {
notEmpty.await();//用notEmpty进行等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signal();//取出数据,同志爱生产者继续生产
System.out.println("从队列取走了一个数据,队列剩余"+queue.size()+"个数据");
}finally {
lock.unlock();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce(){
while(true){//生产者一直生产
lock.lock();
try{
while(queue.size() == queueSize){
System.out.println("队列已满,生产者进入等待");
try {
notFull.await();//用 notFull 进行等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(new Random().nextInt(100));
notEmpty.signalAll();//取出数据,同志爱生产者继续生产
System.out.println("生产者向消费者插入了一个数据,队列剩余"+queue.size()+"个数据");
}finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ConditionProducerAndCustomer conditionProducerAndCustomer = new ConditionProducerAndCustomer();
Producer producer = conditionProducerAndCustomer.new Producer();
Customer customer = conditionProducerAndCustomer.new Customer();
customer.start();
producer.start();
}
}
如果说Lock是用来替代synchronized关键字的,那么Condition其实可以理解为用来替代Object.wati/notify的,在用法和性质上,二者几乎没有区别。
await方法会释放lock的锁,和Object.wait一样,不需要自己手动先行释放
调用await的时候,也必须要持有lock锁,否则会出现异常(虽然编译的时候不会出现异常)
总结
简单总结了一下Semaphore和Condition,下一篇总结一下CountDownLatch和CyclicBarrier