目标
-
线程的等待和通知
-
生产者消费者模式
-
线程池
线程的等待和通知
Object类中的方法
-
wait() 让当前线程进入等待状态,直到被通知为止
-
wait(long) 让当前线程进入等待状态,同时设置时间;直到被通知为止或时间结束
-
notify() 随机通知一个等待线程
-
notifyAll() 通知所有的等待线程
注意:等待和通知方法必须是锁对象,否则会抛出IllegalMonitorStateException
/**
* 通过锁对象将线程等待,经过5秒通知该线程来执行
*/
public class WaitDemo {
public synchronized void print() throws InterruptedException {
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + "--->" +i);
if(i == 50){
//让当前线程等待
this.wait();
}
}
}
public synchronized void notifyTest(){
//让等待的线程执行
this.notifyAll();
}
public static void main(String[] args) {
WaitDemo waitDemo = new WaitDemo();
new Thread(()->{
try {
waitDemo.print();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
waitDemo.notifyTest();
}
}
wait()和sleep()的区别
1.wait()方法时object类的方法 sleep()是Thread类的方法
2.sleep()让线程暂停一段时间,时间一到自动恢复执行,不设计线程间的通信 调用sleep()方法不会释放锁。
Wait() 调用后线程会释放占用的锁,用于线程间的通信,只有其他线程调用notify()方法或者notifyall()才醒来
3.使用域不同 wait()方法必须放在同步代码块和同步控制方法中使用,sleep()方法则可以放在任何地方使用
4.sleep()方法必须捕获异常 而wait() notify() notifyall() 不需要捕获异常
在sleep过程中 可能被其他对象调用它的interrupt() 产生interruptedException 由于sleep不会释放锁标志 容易导致死锁问题的发生 因此一般情况下 推荐使用wait() 方法.
生产者消费者模式
在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式。结构图如下:
实现过程:
-
通过添加缓冲区,设置上限
-
生产者生产数据,向缓冲区存放,如果满了,生产者进入等待,直到缓冲区有空的位置通知生产者生产;
-
消费者从缓冲区取数据进行消费,如果空了,消费者进入等待,直到缓冲区有数据再通知消费者消费。
解决问题:
-
解耦
-
提高并发性能
-
解决忙闲不均
案例实现
包子铺卖包子
import java.util.ArrayList;
import java.util.List;
/**
* 包子铺
*/
public class BaoziShop {
/**
* 包子
*/
class Baozi{
private int id;
public Baozi(int id) {
this.id = id;
}
@Override
public String toString() {
return "包子--" + id;
}
}
//上限
public static final int MAX_COUNT = 100;
//缓冲区 存放数据
private List<Baozi> baozis = new ArrayList<>();
/**
* 做包子
*/
public synchronized void makeBaozi() throws InterruptedException {
//判断缓冲区是否满了
if(baozis.size() == MAX_COUNT){
System.out.printf("缓冲区满了,%s等待%n",Thread.currentThread().getName());
//让生产者线程等待
this.wait();
}else{
//通知生产者线程生产
this.notifyAll();
}
//创建包子
Baozi baozi = new Baozi(baozis.size() + 1);
System.out.println(Thread.currentThread().getName()+"做了"+baozi);
//保存到缓冲区
baozis.add(baozi);
}
/**
* 拿包子
*/
public synchronized void takeBaozi() throws InterruptedException {
//判断缓冲区是否空了
if(baozis.size() == 0){
System.out.printf("缓冲区空了,%s等待%n", Thread.currentThread().getName());
//让消费者等待
this.wait();
}else{
//通知消费者消费
this.notifyAll();
}
//获得第一个包子,并删除
if(baozis.size() > 0){
Baozi baozi = baozis.remove(0);
System.out.println(Thread.currentThread().getName()+"吃了"+baozi);
}
}
public static void main(String[] args) {
BaoziShop baoziShop = new BaoziShop();
//一个生产者
Thread productor = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
baoziShop.makeBaozi();
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
productor.start();
//10个消费者吃10个包子
for (int i = 0; i < 10; i++) {
Thread consumer = new Thread(() ->{
try {
for (int j = 0; j < 10; j++) {
baoziShop.takeBaozi();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.start();
}
}
}
可应用场景:
-
12306售票
-
消息队列
-
线程池
-
等等。。。。
阻塞队列
应用了生产者消费者模式的集合,能够根据数据满或空的情况,自动对线程执行等待和通知
BlockingQueue 接口
-
put 添加数据,达到上限会自动让线程等待
-
take 取并删除数据,数据空了会自动让线程等待
实现类
ArrayBlockingQueue 类 数据结构为数组
LinkedBlockingQueue类 链表结构
/**
* 包子铺
*/
public class BaoziShop2 {
/**
* 包子
*/
static class Baozi{
private int id;
public Baozi(int id) {
this.id = id;
}
@Override
public String toString() {
return "包子--" + id;
}
}
public static void main(String[] args) {
//阻塞队列
BlockingQueue<Baozi> baozis = new ArrayBlockingQueue<>(100);
//生产者线程
new Thread(() -> {
for (int i = 0; i < 200; i++) {
//创建包子,添加到阻塞队列,满了就自动阻塞线程
Baozi baozi = new Baozi(baozis.size() + 1);
try {
baozis.put(baozi);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"生产了"+baozi);
}
}).start();
//消费者线程
for(int i = 0;i < 5;i++){
new Thread(() -> {
//取包子,空了会自动阻塞
for (int j = 0; j < 40; j++) {
try {
Baozi baozi = baozis.take();
System.out.println(Thread.currentThread().getName()+"消费了"+baozi);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
线程池
作用:回收利用线程资源
线程是一种宝贵的系统资源,执行完任务后会死亡,如果有大量任务需要处理,需要频繁的创建和销毁线程,造成系统性能降低。
线程池会保存一定量的线程,线程执行完任务后,会回到线程池中,等待下一个任务,节省系统资源,提升性能。
线程池的使用
顶层接口:Executor
-
execute(Runnable) 启动线程执行一个任务
ExecutorService
继承 Executor
添加线程池管理方法,如:shutdown()、shutdownNow()
Executors
用于创建线程池的工具类
主要的方法
方法名 | 说明 |
---|---|
newCachedThreadPool() | 创建长度不限的线程池 |
newFixedThreadPool(int ) | 创建固定长度的线程池 |
newSingleThreadExecutor() | 创建单一个数的线程池 |
newScheduledThreadPool(int) | 创建可以调度的线程池 |
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void useCachedThreadPool(){
//创建长度不限的线程池
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
int n = i;
//使用线程池启动线程
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+"执行了任务" + n);
});
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
public static void useFixedThreadPool(){
//创建长度固定的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 100; i++) {
int n = i;
//使用线程池启动线程
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+"执行了任务"+n);
});
}
executorService.shutdown();
}
public static void useSingleThreadPool(){
//创建单一长度的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
int n = i;
//使用线程池启动线程
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+"执行了任务"+n);
});
}
executorService.shutdown();
}
public static void useScheduledThreadPool(){
//获得可调度的线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
//执行可调度任务
System.out.println("------------");
scheduledExecutorService.scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName()+"---->"+ LocalDateTime.now());
},1,3, TimeUnit.SECONDS);
}
public static void main(String[] args) {
// useCachedThreadPool();
// useFixedThreadPool();
// useSingleThreadPool();
useScheduledThreadPool();
}
}
线程池的优化配置
线程池的实现类
ThreadPoolExecutor
线程池的构造方法参数:
-
corePoolSize 核心线程数,创建线程池后自带线程,不会进行销毁
-
maximumPoolSize 最大线程数
-
keepAliveTime 存活时间,非核心线程能够闲置的时间,超过后被销毁
-
timeUnit 时间单位
-
blockingQueue 阻塞队列 存放任务(Runnable)的集合
优化配置
-
核心线程数 应该和CPU内核数量相关 CPU内核数 * N (N和任务执行需要时间和并发量相关)
Runtime.getRuntime().availableProcessors()
-
最大线程数可以和核心线程数一样,避免频繁创建和销毁线程
-
如果存在非核心线程,设置大一点,避免频繁创建和销毁线程
-
阻塞队列使用LinkedBlockingQueue,插入和删除任务效率更高
线程池的实现原理
问题:线程池是如何对线程进行回收利用的?
所有线程保存在HashSet中
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>();
总结
-
线程的等待和通知(是干什么,谁来调用)
-
wait和sleep的区别
-
生产者和消费者模式(是什么,解决什么问题,如何实现的,项目中有哪些应用)
-
阻塞队列(了解)
-
线程池(作用,有哪几种线程池,线程池的创建有哪些参数,如何优化,实现原理(看源码、画图、归纳))