浅谈ThreadPoolExecutor线程池底层源码

文章目录




一、线程池基础知识

线程池具体使用细节不再本篇文章的讨论范围,想了解用法的请自行百度,这里仅展示一个线程池小Demo

该线程池的作用为:使用线程池执行100000个线程,线程任务实现的是callable接口,每个线程打印相应的逻辑,然后返回(此处线程池并没有对返回的结果进行接收)

public class TestThreadPool {
	public static void main(String[] args) throws ExecutionException, InterruptedException {
		CopyOnWriteArrayList<Future> retList = new CopyOnWriteArrayList<>();
		List<Task> taskList = new ArrayList<>();
		for (int i = 0; i < 100000; i++) {
			taskList.add(new Task(i));
		}
		ExecutorService threadPool = new ThreadPoolExecutor(
            	1,//核心的线程数量
				3,//最大的线程数量
				10,//等待一定事件后关闭最大线程
				TimeUnit.MILLISECONDS,//等待时间的单位
				new LinkedBlockingQueue<>(10),//创建一个队列
				Executors.defaultThreadFactory(),//创建线程的线程工厂
				new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略

		for (int i = 0; i < taskList.size(); i++) {
			threadPool.submit(taskList.get(i));
		}
	}
}

class Task implements Callable<Integer> {
	private Integer num;

	public Task(Integer num) {
		this.num = num;
	}

	@Override
	public Integer call() throws Exception {
		System.out.println("---uuu--" + num);
		return num;
	}
}



三大方法

//1.创建一个只有一个线程的线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();

//2.创建一个可伸缩的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();

//3.创建一个指定最大数量的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);



七大参数

ExecutorService threadPool = new ThreadPoolExecutor(
            	1,//核心的线程数量
				3,//最大的线程数量
				10,//等待一定事件后关闭最大线程
				TimeUnit.MILLISECONDS,//等待时间的单位
				new LinkedBlockingQueue<>(10),//创建一个队列,存放任务
				Executors.defaultThreadFactory(),//创建线程的线程工厂
				new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略



四种拒绝策略

//多出来的线程,直接抛出异常
new ThreadPoolExecutor.AbortPolicy()
    
//谁开启的这个线程,就让这个线程返回给谁执行。比如main线程开启的,那就返回给main线程执行    
new ThreadPoolExecutor.CallerRunsPolicy()
    
//如果队列线程数量满了以后,直接丢弃,不抛出异常
new ThreadPoolExecutor.DiscardPolicy()
    
//队列满了以后,尝试去和最早的线程竞争,也不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy





二、execute流程简单分析

1、调用execute方法,submit就是在execute的方法上套了一层RunnableFuture。(下面代码中的workQueue就是我们初始化线程池的时候传入的队列)

浅谈ThreadPoolExecutor线程池底层源码



2、addWorker方法。该方法主要看关注传入的参数firskTask,即我们传入的线程任务,也就是execute方法的参数

浅谈ThreadPoolExecutor线程池底层源码



3、它将我们传入的线程封装为一个worker对象。将线程任务firskTask赋值给firstTask属性,将worker对象赋值给thread属性。(对应的getThreadFactory方法就是调用我们初始化线程池的时候的线程工厂去创建)

浅谈ThreadPoolExecutor线程池底层源码



4、回到第二步的代码中,除了将我们的线程任务添加到workers集合中,还调用了t变量的start方法。t变量就是第三步的thread变量,即使我们的worker对象



5、调用worker对象的start方法。由于Worker实现了Runnable接口,所以start方法对应的就是调用Runnable接口的run方法

浅谈ThreadPoolExecutor线程池底层源码



6、然后调用对应的runWorker方法。在该方法中,拿到worker对象的firstTask属性,即我们前文execute方法的参数,也即是我们的线程任务。然后调用线程任务的run 方法

浅谈ThreadPoolExecutor线程池底层源码



简单理一下这个流程,我们可以得到如下结论:

  1. 当我们使用线程池执行线程任务的时候,我们执行线程的时候调用的使run方法,并非调用start方法的形式来开启多线程
  2. 线程池将我们传入的线程任务封装成一个worker对象,在这个过程中是有调用这个封装的worker对象的start方法





三、线程出现异常,后续流程源码

1、还是以execute方法为入口,然后直接跳转到上面第二小节的第六点,进入runWorker方法。run方法发生异常,会进入进入后面的processWorkerExit方法。传入的参数w是worker对象

浅谈ThreadPoolExecutor线程池底层源码



2、在该方法中,首先将worker对象从workers队列中移除(前面有说到workers这个队列中存放的是所有任务),然后添加一个firstTask为null的任务。这里又会调用addWorker方法,但是由于firstTask是null,所以就没有对应的线程任务可以执行

浅谈ThreadPoolExecutor线程池底层源码



总结:即如果使用线程池在执行多个线程任务的时候,其中一个线程发生异常。那么线程池会捕获这个发生异常的线程,然后将这个线程从线程任务队列中移除(workers)。再添加一个任务为null的线程任务,再执行这个null任务。也可以变相的理解为我们要执行的线程任务,被丢弃了。





四、拒绝策略的源码

我相信你在看了源码之后,就能够很简单的记住,它们每种拒绝策略的作用

使用拒绝策略的时候会调用这个reject方法,对应的handler在初始化线程池的时候会传入具体的实现

浅谈ThreadPoolExecutor线程池底层源码



AbortPolicy:直接抛出异常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}



CallerRunsPolicy:交由调用它的线程执行

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}



DiscardOldestPolicy:弹出队列(poll)中的其他线程任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}



DiscardPolicy:什么操作都不执行

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}



自定义拒绝策略

自定义拒绝策略只需要实现RejectedExecutionHandler接口,再重写rejectedExecution方法即可。我们可以将任务写到类似mq的中间件,或者持久化到数据库。再编写一个线程去不断监控线程池的队列情况,如果发现队列中任务下降到一定的阈值,那么我们就可以读取之前任务,将他们再次存放到队列中。

上一篇:死磕并发编程第五篇


下一篇:Java 多线程的一次整理