Java线程池ThreadPoolExecutor源码浅析

一、UML图

Java线程池ThreadPoolExecutor源码浅析

二、创建线程池

2.1、Executors工厂方法

在ThreadPoolExecutor类的文档注释中有这么一句话:An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.

也就是说,已经提供了通过Executors获取线程池的工厂方法,也就是常说的五种线程池:

Java线程池ThreadPoolExecutor源码浅析

Executors中的方法:

/**
 * @since 1.5
 * @author Doug Lea
 */
public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }
    
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));
    }
}

可以看到,这些方法最终都是实例化一个ThreadPoolExecutor对象。

2.2、ThreadPoolExecutor构造器

ThreadPoolExecutor类中一共提供了四个构造方法:

Java线程池ThreadPoolExecutor源码浅析

这几个方法只有入参不同而已,也就是说,我们可以根据所需要的自定义线程池配置,选择合适的方法;而前面三个最终都是调用第四个方法实例化ThreadPoolExecutor的对象。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

三、线程池构造参数

通过构造方法public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 我们可以知晓线程池可自定义的参数。

通过API可知,各参数意义如下:

3.1、corePoolSize—核心池容量

用于指定线程池中处于就绪状态的线程最小数,也就是说,当没有任务提交时,线程池中保持多少个工作线程。如果设置了工作线程的超时时间,则此值可能为0。注:当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize。

/**
 * Core pool size is the minimum number of workers to keep alive
 * (and not allow to time out etc) unless allowCoreThreadTimeOut
 * is set, in which case the minimum is zero.
 */
private volatile int corePoolSize;

3.2、maximumPoolSize—最大池容量

用于指定线程池可容纳的最大工作线程数,线程池根据corePoolSize和maximumPoolSize来自动调节容量大小:

A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize and maximumPoolSize.

/**
 * Maximum pool size. Note that the actual maximum is internally
 * bounded by CAPACITY.
 */
private volatile int maximumPoolSize;

3.3、keepAliveTime—存活时间

指定空闲线程等待的超时时间(默认是纳秒)。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用。如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

/**
 * Timeout in nanoseconds for idle threads waiting for work.
 * Threads use this timeout when there are more than corePoolSize
 * present or if allowCoreThreadTimeOut. Otherwise they wait
 * forever for new work.
 */
private volatile long keepAliveTime;

3.4、TimeUnit—时间单位

指定keepAliveTime的单位,默认使用纳秒。TimeUnit是一个枚举,定义的值如下:

TimeUnit.NANOSECONDS;       //纳秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.SECONDS;           //秒
TimeUnit.MINUTES;           //分钟
TimeUnit.HOURS;             //小时
TimeUnit.DAYS;              //天

public enum TimeUnit {
    /**
     * Time unit representing one thousandth of a microsecond
     */
    NANOSECONDS {
        public long toNanos(long d)   { return d; }
        public long toMicros(long d)  { return d/(C1/C0); }
        public long toMillis(long d)  { return d/(C2/C0); }
        public long toSeconds(long d) { return d/(C3/C0); }
        public long toMinutes(long d) { return d/(C4/C0); }
        public long toHours(long d)   { return d/(C5/C0); }
        public long toDays(long d)    { return d/(C6/C0); }
        public long convert(long d, TimeUnit u) { return u.toNanos(d); }
        int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
    },
    ...
}

3.5、workQueue—工作队列

用于存储等待中的任务的队列。

/**
 * The queue used for holding tasks and handing off to worker
 * threads.  We do not require that workQueue.poll() returning
 * null necessarily means that workQueue.isEmpty(), so rely
 * solely on isEmpty to see if the queue is empty (which we must
 * do for example when deciding whether to transition from
 * SHUTDOWN to TIDYING).  This accommodates special-purpose
 * queues such as DelayQueues for which poll() is allowed to
 * return null even if it may later return non-null when delays
 * expire.
 */
private final BlockingQueue<Runnable> workQueue;

而BlockingQueue的实现类如下:

Java线程池ThreadPoolExecutor源码浅析

而Executors工厂方法中使用的是LinkedBlockingQueueDelayedWorkQueueSynchronousQueue

3.6、ThreadFactory—线程工厂

用于创建线程。

/**
 * Factory for new threads. All threads are created using this
 * factory (via method addWorker).  All callers must be prepared
 * for addWorker to fail, which may reflect a system or user's
 * policy limiting the number of threads.  Even though it is not
 * treated as an error, failure to create threads may result in
 * new tasks being rejected or existing ones remaining stuck in
 * the queue.
 *
 * We go further and preserve pool invariants even in the face of
 * errors such as OutOfMemoryError, that might be thrown while
 * trying to create threads.  Such errors are rather common due to
 * the need to allocate a native stack in Thread.start, and users
 * will want to perform clean pool shutdown to clean up.  There
 * will likely be enough memory available for the cleanup code to
 * complete without encountering yet another OutOfMemoryError.
 */
private volatile ThreadFactory threadFactory;

3.7、RejectedExecutionHandler—饱和策略

用于指定当线程池和工作队列都处于饱和状态时,新任务的处理策略。

/**
 * Handler called when saturated or shutdown in execute.
 */
private volatile RejectedExecutionHandler handler;
一共有四种策略:

Java线程池ThreadPoolExecutor源码浅析

  • AbortPolicy:终止策略

    通过抛出一个RejectedExecutionException异常而拒绝此任务。

**
 * A handler for rejected tasks that throws a
 * {@code RejectedExecutionException}.
 */
public static class AbortPolicy implements RejectedExecutionHandler {...}
  • CallerRunsPolicy:调用者执行策略

    直接让调用线程执行此任务,若调用线程已经停止,则丢弃此任务。

/**
 * A handler for rejected tasks that runs the rejected task
 * directly in the calling thread of the {@code execute} method,
 * unless the executor has been shut down, in which case the task
 * is discarded.
 */
public static class CallerRunsPolicy implements RejectedExecutionHandler {...}
  • DiscardOldestPolicy:丢弃最旧任务策略

    丢弃处于等待队列头部的任务,并尝试执行此任务。

/**
 * A handler for rejected tasks that discards the oldest unhandled
 * request and then retries {@code execute}, unless the executor
 * is shut down, in which case the task is discarded.
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {...}
  • DiscardPolicy:丢弃策略

    静默拒绝任务,不抛出异常。

/**
 * A handler for rejected tasks that silently discards the
 * rejected task.
 */
public static class DiscardPolicy implements RejectedExecutionHandler {...}

四、提交任务

向线程池提交任务主要有两种方式

  1. execute(Runnable command)
  2. submit(Callable<T> task)submit(Runnable task, T result)submit(Runnable task)

4.1、execute方法

传入一个Runnable对象,无返回值。无法知晓任务是否执行成功。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        ...
    }
}

4.2、submit方法

传入一个Callable或Runnable对象,返回一个Future对象。通过返回的Future对象可判断让任务是否执行成功。可通过future的get方法来获取任务执行结果,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    ...
}

五、关闭线程池

ThreadPoolExecutor提供了两个关闭方法:shutdown()shutdownNow()

shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查终止权限
            checkShutdownAccess();
            //设置线程池状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中断线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查终止权限
            checkShutdownAccess();
            //设置线程池状态为STOP
            advanceRunState(STOP);
            //中断线程
            interruptWorkers();
            //获取待执行任务的列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
}
上一篇:Python 线程池


下一篇:Java线程池相关知识