Java的一大优势是能完成多线程任务,对线程的封装和调度非常好,那么它又是如何实现的呢?
jdk的包下和线程相关类的类图。
从上面可以看出Java的线程池主的实现类主要有两个类ThreadPoolExecutor
和ForkJoinPool
。
ForkJoinPool
是Fork/Join
框架下使用的一个线程池,一般情况下,我们使用的比较多的就是ThreadPoolExecutor
。我们大多数时候创建线程池是通过Executors
类的几个方法实现的:
- newFixedThreadPool():创建一个固定线程数的线程池,可控制线程最大并发数,适用需要限制线程池数量的应用场景。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- newSingleThreadExecutor():创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO)执行适用于那种需要按照线程数量执行的场景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- newCachedThreadPool():创建一个可以根据需要创建新线程的线程池,它是没有线程数量限制的,适用于短期异步任务的操作,或者是负载比较轻的服务器。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- newScheduledThreadPool():创建一个固定线程数的线程池,支持定时及周期性执行后台任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
这里看到基本上这几个方法都是返回了ThreadPoolExecutor
这个对象。以下是ThreadPoolExecutor
的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
几个参数的含义:
- corePoolSize(线程池基本大小):当提交一个新任务到线程池,线程会创建一个新的线程来执行任务,无论其他的基本线程是否是空闲的状态。这种情况会持续到当需要执行的任务数量大于线程池基本线程数量大小时就不再创建了。
- maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数量,如果队列满了,并且已经创建的线程数小于最大线程数量,这个线程池会再创建新的线程执行任务。
- KeepAliveTime(线程保持活动的时间):线程空闲之后保持存活的时间。
- TimeUnit(线程保持活动时间的单位):可以使用TimeUnit时间单位来设置。
- runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列,可以选择以下几个:
- ArrayBlockingQueue:基于数组的阻塞队列,按照FIFO原则进行排序
- LinkedBlockingQueue:基于链表的阻塞队列,按照FIFO原则对元素进行排序,吞吐量高于ArrayBlockingQueue。Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不储存元素的阻塞队列,每一个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量高于LinkedBlockingQueue,Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列
- RejectedExecutionHandler(饱和策略):这个本身是Java的一个接口,当队列和线程池都满了,需要一种策略处理新的任务,在这个类的最下部提供了四种内置的实现类:
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在的线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前的任务。
- DiscardPolicy:不处理,直接丢弃。
- 自定义策略:实现RejectedExecutionHandler接口,自定义策略。
- ThreadFactory(线程工厂):用于设置创建新的线程的工厂。可以使用guava的ThreadFactoryBuilder来创建一个ThreadFactory。
线程池用的最多的是execute()
,它的执行实际上分了三步:
- 当少量的线程在运行,线程的数量还没有达到
corePoolSize
,那么启用新的线程来执行新任务。 - 如果线程数量已经达到了
corePoolSize
,那么尝试把任务缓存起来,然后二次检查线程池的状态,看这个时候是否能添加一个额外的线程,来执行这个任务。如果这个检查到线程池关闭了,就拒绝任务。 - 如果我们没法缓存这个任务,那么我们就尝试去添加线程去执行这个任务,如果失败,可能任务已被取消或者任务队列已经饱和,就拒绝掉这个任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
线程池把每个线程都封装成一个对象Worker
,以上有一个关键的函数addWorker()
:
addWorker(Runnable firstTask, boolean core)
firstTask
代表这个线程池首先要执行的任务,core
代表是否使用corePoolSize
来做为线程池线程的最大标记。
以上就是对线程池的一个基本解析。