Java中的线程池
一、线程池的底层原理
1.1 先来了解一下原理
先放一张图片上来,看看线程池工作的核心步骤
1:)当任务提交的时候,先判断核心线程池是否已经满了,如果没有满,则直接创建线程执行任务
2:)如果核心线程池满了,则会把多余的任务放在我们定义的阻塞队列中,判断阻塞队列是否已经满了
3:)如果阻塞队列也已经满了,此时会启用线程池中所有的线程(即最大线程都会被用完),判断是否都满了
4:)如果最大线程池也已经满了,那么下面多余的任务就会启动我们定义的四大拒绝策略中的一个。
1.2 底层的源码分析
Java底层线程池的源码如下:
public ThreadPoolExecutor(int corePoolSize, // 核心池线程数大小 (常用)
int maximumPoolSize, // 最大的线程数大小 (常用)
long keepAliveTime, // 超时等待时间 (常用)
TimeUnit unit, // 时间单位 (常用)
BlockingQueue<Runnable> workQueue, // 阻塞队列(常用)
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略(常用)) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1.3 七大参数的分析
其实在线程池中主要的就是,三大方法,七大核心参数,四大拒绝策略,以及最大线程池的设置方法。**有一个ThreadPoolExecutor的构造函数,主要有七个核心的参数:
corePoolSize ------- 核心池线程数大小 (常用)
maximumPoolSize ---- 最大的线程数大小 (常用)
keepAliveTime ------ 超时等待时间 (常用)
unit --------------- 时间单位 (常用)
workQueue --------- 阻塞队列(常用)
threadFactory ------ 线程工厂
handler ------------ 拒绝策略(常用))
七大核心参数中,首先来看看最大线程池的问题。我们怎么设置最大线程池的数量呢?
答:这个就要根据我们的实际业务来了,主要有如下三种情况吧
第一种:如果是CPU密集型的话,我们可以把最大线程池大小设置为N+1(N为处理器的数量),
在java中可以通过这么一行代码来获取
System.out.println(Runtime.getRuntime().availableProcessors());
第二种:如果是I/O密集型的话,可能占用CPU的时间不是很多,所以我们可以多设置一下线程,让CPU忙起来,充分提高效率。可以设置为2N+1
第三种:;两种混合在一起,即是CPU密集型,也是I/O密集型。这样的话,可以分别处理,将任务分为CPU
密集型和I/O密集型,然后用不同的线程池去处理。
1.4 阻塞队列的分析
至于这个阻塞队列(会把他单独作为一个模块来说后面....)推荐看一下
1.5 内置的四大拒绝策略
1)AbortPolicy: 这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。
2)DisCardPolicy: 丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。例如,本人的博客网站统计阅读量就是采用的这种拒绝策略。
3)DisCardOldestPolicy: 丢弃队列最前面的任务,然后重新提交被拒绝的任务。 此拒绝策略,是一种喜新厌旧的拒绝策略。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。
4)CallerRunPolicy: 由调用线程处理该任务 。 如果任务被拒绝了,则由调用线程(提交任务的线程)直接执行此任务 。
二:创建线程池的方法
Java中创建线程池的方式一般有两种:
1:通过Executors工厂创建
2:通过new ThreadPoolExecutor(.......)自定义创建
讲解之前,我们应该先来看看 ExecutorService 这个接口。那就先来说说我们自定义创建线程池方法 ExecutorService是Java中对线程池定义的一个接口,它java.util.concurrent
包中,在这个接口中定义了和后台任务执行相关的方法:
2.1 自定义创建线程池方法(重点)
package com.test;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义线程池
* @author huxd
* @createTime 2020-04-04 21:22:10
**/
public class ThreadPoolDemo {
public static void main(String[] args) {
/** 阻塞队列,一般指定默认的值,如果不指定的话,默认会被看成是一个*队列,这个可能会造成OOM */
LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(10);
System.out.println(Runtime.getRuntime().availableProcessors());
ThreadFactory threadFactory = new ThreadFactory() {
/** int i = 1,用于并发安全的包装类 */
AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
/** 创建线程,把任务传进来 */
Thread thread = new Thread(r);
/** 给线程起个名字 */
thread.setName("MyThread"+atomicInteger.getAndIncrement());
return thread;
}
};
ExecutorService threadPool = new ThreadPoolExecutor(
5,
9,
3L,
TimeUnit.SECONDS,
blockingQueue,
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
try {
method();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
private static void method() throws InterruptedException {
System.out.println("ThreadName:"+Thread.currentThread().getName()+"进来了");
TimeUnit.SECONDS.sleep(5);
System.out.println("ThreadName:"+Thread.currentThread().getName()+"出去了");
}
}
通过new ThreadPoolExecutor并设置好七大参数,并提交任务,最后用完一定要关闭线程池。
execute(Runnable)
这个方法接收一个Runnable实例,并且异步的执行,请看下面的实例:
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
这个方法有个缺陷,就是没有返回值的结果,不知道任务是不是真的已经完成了。
submit(Runnable)
submit(Runnable)
和execute(Runnable)
区别是前者可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕,请看下面执行的例子:
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
future.get(); //returns null if the task has finished correctly.
如果任务执行完成,future.get()
方法会返回一个null。注意,future.get()方法会产生阻塞。
submit(Callable)
submit(Callable)和submit(Runnable)类似,也会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。请看下面实例:
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get());
如果任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。
在深挖一下execute和submit源码发现其中的区别。
先来说说execute:(这个有点负责,先按下不表)
其次来聊聊submit,看看源码底层
源码底层提供了submit三种重载方式,他们的返回值都是Future
<T> Future<T> submit(Callable<T> task); // 其底层实现如下
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
我们可以看到穿入了一个Callable的对象,我们都知道,Callable是创建线程的三大方式之一,而且它的一个call方法可以有返回值,这是其余两种创建线程的方式锁不具备的。
具体的判断流程是:第一步是,如果闯入的线程对象为空,直接抛出空异常;其次会调用newTaskFor方法,返回一个RunnableFuture对象,至于这个RunnableFuture,在我的另一篇博客,Future详解中有介绍,这里先按下不表,不过我们要到newTaskFor里面看看,他到底做了什么操作。
原来它是返回了一个FutureTask的构造函数,有些人可能就会很迷惑,为什么返回FutureTask,它与RunnableFuture有什么关系吗?有关系,而且有很紧密的关系,当然这个在我的关系在我的另一篇博客,Future中有详细的解释,可以移步去看看。
到这里就是FutureTask的一个构造函数了,返回的是RunnableFuture的子类FutureTask,并且他们的最终父类是Future。他们三者关系时:Future<------(extends)-----RunnableFuture<-------(implements)--------FutureTask。
那么这样的话,我们就知道调用submit,最终返回的是Future的实现类FutureTask的构造函数,并且在FutureTask中传入了Callable作为参数,那么我们就可以拿得到其call方法返回值的内容了,通过Future类中的get()方法可以得到线程的计算的值了。这个才是我们最终需要的。
示例:
package com.future;
import java.util.concurrent.*;
/**
* future的一个例子
* @author Huxudong
* @createTime 2020-04-06 17:15:41
**/
public class FutureDemo {
private static ExecutorService pool = new ThreadPoolExecutor(
5,
9,
3L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
/** 使用Future来实现 */
FutureTest();
}
private static void FutureTest() {
try{
Future<String> future = getFutureKid();
String s = future.get();
System.out.println("拿到的值是-----"+s+"-----哈哈哈");
System.out.println("你去B系统取ID,我先去做自己的其他的事情了");
System.out.println("我的其他的事情都已经做好了呀,就差你了,快点啊。拿到了吗?"+future.isDone());
long counter = 0;
String temp = "";
/** 判断是否已经完成 */
while(!future.isDone()) {
counter++;
}
temp = future.get();
System.out.println("我的天,你终于拿到了呀,ID="+temp);
System.out.println("你知道我等了多久吗?"+counter);
} catch(Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
public static Future<String> getFutureKid() {
return pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
/** 用睡眠等待模拟具体实际的业务,获取一个什么东西 */
TimeUnit.SECONDS.sleep(5);
return "String";
}
});
}
}
我们再来看看传入的是Runnable的对象,底层源码发生了什么?
好像没啥太大区别,同样是需要返回一个RunnableFuture的对象,只不过newTaskFor传入了两个参数,调用了FutureTask的另外一个构造函数,看看里面做了什么东西
会发现返回的callbble对象时调用Eexcutors里面的方法,继续往里面看
发现在这里new RunnableAdapter,这个就是设计模式中的是适配器模式,在这里的作用就是将我们传入的Runnble对象转换为Callable对象
到此结束,原来根本就是,把我们传入的Runnable对象转换成了Callable对象。
同理:如果此时使用这个构造函数传入两个参数的话,会返回第二个参数的值。
示例:
package com.future;
import java.util.concurrent.*;
/**
* future的一个例子
* @author Huxudong
* @createTime 2020-04-06 17:15:41
**/
public class FutureDemo {
private static ExecutorService pool = new ThreadPoolExecutor(
5,
9,
3L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
/** 使用Future来实现 */
FutureTest();
}
private static void FutureTest() {
try{
Future<String> future = getFutureKid();
String s = future.get();
System.out.println("拿到的返回值是----"+s+"-----哈哈哈");
System.out.println("你去B系统取ID,我先去做自己的其他的事情了");
System.out.println("我的其他的事情都已经做好了呀,就差你了,快点啊。拿到了吗?"+future.isDone());
long counter = 0;
String temp = "";
/** 判断是否已经完成 */
while(!future.isDone()) {
counter++;
}
temp = future.get();
System.out.println("我的天,你终于拿到了呀,ID="+temp);
System.out.println("你知道我等了多久吗?"+counter);
} catch(Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
public static Future<String> getFutureKid() {
return pool.submit(()->{
System.out.println("这里就是好啊");
},"hello");
}
}
2.2 Executors创建线程池(未完待续......)
参考文章:
https://blog.csdn.net/suifeng3051/article/details/49443835
https://blog.csdn.net/wei_lei/article/details/74262818
https://www.cnblogs.com/shamo89/p/8847028.html