创建线程的四种方式
- 实现Runnable接口,重写run()方法(避免多继承局限)
- 继承Thread类,重写run()方法(本质:Thread类也是实现Runnable接口)
- 实现Callable接口,重写call()方法,有返回值
- 使用线程池(使用原因:不推荐手动创建线程,不方便管理,易造成较大开销或浪费)
初识线程池
在Java中,我们可以利用多线程来最大化地压榨CPU多核计算的能力。但是,线程本身是把双刃剑,我们需要知道它的利弊,才能在实际系统中游刃有余地运用。
线程池,本质上是一种对象池,用于管理线程资源。 在任务执行前,需要从线程池中拿出线程来执行。在任务执行完成之后,需要把线程放回线程池。通过线程的这种反复利用机制,可以有效地避免直接创建线程所带来的坏处。
不使用线程池的坏处:
- 频繁的线程创建和销毁会占用更多的CPU和内存;
- 频繁的线程创建和销毁会对GC产生比较大的压力;
- 线程太多,线程切换带来的开销将不可忽视;
- 线程太少,多核CPU得不到充分利用,是一种浪费。
线程池的好处:
- 降低资源的消耗。线程本身是一种资源,创建和销毁线程会有CPU开销;创建的线程也会占用一定的内存;
- 提高任务执行的响应速度。任务执行时,可以不必等到线程创建完之后再执行;
- 提高线程的可管理性。线程不能无限制地创建,需要进行统一的分配、调优和监控。
因此,我们有必要对线程池进行比较完整地说明,以便能对线程池进行正确地治理。
线程池实现原理
线程池主要处理流程
通过上图,我们看到了线程池的主要处理流程。我们的关注点在于,任务提交之后是怎么执行的。大致如下:
- 判断核心线程池是否已满,如果不是,则创建线程执行任务;
- 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中;
- 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务;
- 如果线程池也满了,则按照拒绝策略对任务进行处理。
在jdk里面,我们可以将处理流程描述得更清楚一点。来看看ThreadPoolExecutor
的处理流程。
ThreadPoolExecutor的处理流程:
-
corePool
-> 核心线程池 -
maximumPool
-> 线程池 -
BlockQueue
-> 队列 -
RejectedExecutionHandler
-> 拒绝策略
代码示例
Executors(不推荐使用)
Executors类创建线程池的方法归根结底都是调用ThreadPoolExecutor类,只不过对每个方法赋值不同的参数去构造ThreadPoolExecutor对象。
- newCachedThreadPool:创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
- newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
注意:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式创建。
原因:上述四个方法的创建的队列大小默认都是Integer.MAX_VALUE,堆积过多的任务请求会可能导致OOM。
public class ThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 定时调度,每个调度任务会至少等待`period`的时间,
// 如果任务执行的时间超过`period`,则等待的时间为任务执行的时间
executor.scheduleAtFixedRate(() -> {
try {
Thread.sleep(10000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 2, TimeUnit.SECONDS);
// 定时调度,延迟`delay`后执行,且只执行一次
executor.schedule(() -> System.out.println("5 秒之后执行 schedule"), 5, TimeUnit.SECONDS);
}
}
ThreadPoolExecutor
- corePoolSize: 常驻核心线程数,如果大于0,即使本地任务执行完也不会被销毁
- maximumPoolSize: 线程池能够容纳可同时执行的最大线程数
- keepAliveTime: 线程池中线程空闲的时间,当空闲时间达到该值时,线程会被销毁, 只剩下 corePoolSize 个线程数量。
- unit: 空闲时间的单位。一般以TimeUnit类定义时分秒。
-
workQueue: 当请求的线程数大于 corePoolSize 时,线程进入该阻塞队列。
- LinkedBlockingQueue:*队列,当不指定队列大小时,将会默认为Integer.MAX_VALUE大小的队列,因此大量的任务将会堆积在队列中,最终可能触发OOM。
- ArrayBlockingQueue:有界队列,基于数组的先进先出队列,此队列创建时必须指定大小。
- PriorityBlockingQueue:有界队列,基于优先级任务的,它是通过Comparator决定的。
- SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
- threadFactory: 线程工厂,用来生产一组相同任务的线程,同时也可以通过它增加前缀名,虚拟机栈分析时更清晰
-
handler: 执行拒绝策略,当 workQueue 已满,且超过maximumPoolSize 最大值,就要通过这个来处理,比如拒绝,丢弃等,这是一种限流的保护措施。
- AbortPolicy:默认的拒绝策略,抛RejectedExecutionException异常
- DiscardPolicy:相当大胆的策略,直接丢弃任务,没有任何异常抛出
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
- CallerRunsPolicy:提交任务的线程自己去执行该任务
-
线程池关闭
- shutdown() : 不会立刻终止线程,等所有缓存队列中的任务都执行完毕后才会终止。
- shutdownNow() : 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
-
线程池监控
- long getTaskCount():获取已经执行或正在执行的任务数
- long getCompletedTaskCount():获取已经执行的任务数
- int getLargestPoolSize():获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过
- int getPoolSize():获取线程池线程数
- int getActiveCount():获取活跃线程数(正在执行任务的线程数)
public class ThreadPool {
public static void main( String[] args ){
// maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i++) {
pool.execute(new ThreadTask());
}
}
}
class ThreadTask implements Runnable{
public ThreadTask() {
}
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
ThreadFactory(阿里Java开发手册推荐使用)
// 使用工厂类可以设置线程名字
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
注意需要引入guava包,否则ThreadFactoryBuilder会报错
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>