线程池相关参数、调度流程学习

线程池任务调度流程

线程池相关参数、调度流程学习


线程池相关参数、调度流程学习

线程池构造方法一览

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}

相关参数解析

/**
 * 
 * 核心线程数,如果没有设置allowCoreThreadTimeOut,线程一定存活(keep alive)
 * Core pool size is the minimum number of workers to keep alive
 * (and not allow to time out etc) unless allowCoreThreadTimeOut
 * is set, in which case the minimum is zero.
 */
private volatile int corePoolSize;

/**
 * 最大线程数 (但实际的最大值取决于/受限于CAPACITY)
 * Maximum pool size. Note that the actual maximum is internally
 * bounded by CAPACITY.
 */
private volatile int maximumPoolSize;

/**
 * 等待任务的闲置线程的超时时间(纳秒)
 * 什么情况下使用?
 *  1.有超过corePoolSize的线程存在时;
 *  2.allowCoreThreadTimeOut的值为true即允许核心线程超时
 * 其他情况下,线程会永远保持等待新任务的存活状态,即keep alive
 * Timeout in nanoseconds for idle threads waiting for work.
 * Threads use this timeout when there are more than corePoolSize
 * present or if allowCoreThreadTimeOut. Otherwise they wait
 * forever for new work.
 */
private volatile long keepAliveTime;

/**
 * keepAliveTime的时间单位
 * the time unit for the {@code keepAliveTime} argument
 */
TimeUnit unit;

/**
 * 用于持有任务并将任务切换给工作线程的队列
 * 当workQueue未满而有新任务进来时,会先将新任务放进这个队列。
 * 当workQueue已满且有新任务进来时,才会去创建大于corePoolSize而小于maximumPoolSize的这部分线程;
 * 当workQueue已满且线程数已达maximumPoolSize,再有新任务进来,就涉及到拒绝策略了。后面会讲到
 * 看一下上面构造方法里的注释"This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method."
 * 它仅仅用来存放被execute方法提交的Runnable任务
 * The queue used for holding tasks and handing off to worker
 * threads.  We do not require that workQueue.poll() returning
 * null necessarily means that workQueue.isEmpty(), so rely
 * solely on isEmpty to see if the queue is empty (which we must
 * do for example when deciding whether to transition from
 * SHUTDOWN to TIDYING).  This accommodates special-purpose
 * queues such as DelayQueues for which poll() is allowed to
 * return null even if it may later return non-null when delays
 * expire.
 */
private final BlockingQueue<Runnable> workQueue;

/**
 * 创建新线程的工厂
 * Factory for new threads. All threads are created using this
 * factory (via method addWorker).  All callers must be prepared
 * for addWorker to fail, which may reflect a system or user‘s
 * policy limiting the number of threads.  Even though it is not
 * treated as an error, failure to create threads may result in
 * new tasks being rejected or existing ones remaining stuck in
 * the queue.
 *
 * We go further and preserve pool invariants even in the face of
 * errors such as OutOfMemoryError, that might be thrown while
 * trying to create threads.  Such errors are rather common due to
 * the need to allocate a native stack in Thread.start, and users
 * will want to perform clean pool shutdown to clean up.  There
 * will likely be enough memory available for the cleanup code to
 * complete without encountering yet another OutOfMemoryError.
 */
private volatile ThreadFactory threadFactory;

/**
 * 拒绝策略。执行中遇到饱和或关闭情况调用的处理器
 * Handler called when saturated or shutdown in execute.
 */
private volatile RejectedExecutionHandler handler;

拒绝策略拓展

一共4种拒绝策略,这里仅关注juc包下的内容。

线程池相关参数、调度流程学习

/**
 * 1.中止策略。拒绝任务,抛出RejectedExecutionException,这也是默认拒绝策略
 * A handler for rejected tasks that throws a
 * {@code RejectedExecutionException}.
 */
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 从这里可以看到,抛RejectedExecutionException异常
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

/**
 * 默认拒绝策略就是AbortPolicy(中止策略,拒绝任务)
 * The default rejected execution handler
 */
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
    
/**
 * 直接在调用execute方法的线程中运行被拒绝的任务
 * 除非执行器已被关闭,这种情况下任务会被丢弃。
 * A handler for rejected tasks that runs the rejected task
 * directly in the calling thread of the {@code execute} method,
 * unless the executor has been shut down, in which case the task
 * is discarded.
 */
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller‘s thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 从这里可以看到,如果线程池执行器未关闭,直接在调用者线程执行该任务
        if (!e.isShutdown()) {
            r.run();
        }
    }
}   

/**
 * 丢弃最老的尚未处理的请求,然后执行新任务
 * 除非执行器已被关闭,这种情况下任务会被丢弃
 * A handler for rejected tasks that discards the oldest unhandled
 * request and then retries {@code execute}, unless the executor
 * is shut down, in which case the task is discarded.
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果线程池执行器未关闭,丢弃最老的任务,然后执行新任务
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
} 

/**
 * 静静地丢弃任务,不做任何处理
 * 注:默认的AbortPolicy会抛RejectedExecutionException异常
 * A handler for rejected tasks that silently discards the
 * rejected task.
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 可以看到,此处啥也没做。
    }
}  
  • AbortPolicy : 直接抛异常处理,这是默认的拒绝策略
  • DiscardPolicy : 直接抛弃(任务)不处理
  • DiscardOldestPolicy : 丢弃队列中最老的任务
  • CallerRunsPolicy : 将任务分配给当前执行execute方法的线程来处理

参数部分的补充

关于参数allowCoreThreadTimeOut:

/**
 * 这个参数跟corePoolSize相关
 * If false (default), core threads stay alive even when idle.
 * If true, core threads use keepAliveTime to time out waiting
 * for work.
 */
private volatile boolean allowCoreThreadTimeOut;

核心线程数怎么设置?有什么评估标准不?

假设机器有N个CPU,那么对于计算密集型的任务,应该设置线程数为N+1;对于IO密集型的任务,应该设置线程数为2N;对于同时有计算工作和IO工作的任务,应该考虑使用两个线程池,一个处理计算任务,一个处理IO任务,分别对两个线程池按照计算密集型和IO密集型来设置线程数。

计算密集型:N+1
IO密集型:2*N

 

CPU(计算)密集型举例:
我们可以把任务分为计算密集型和IO密集型。
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

线程池为何不允许使用Executors创建?

线程池相关参数、调度流程学习

如上图,出自《Java开发手册嵩山版》
FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

CachedThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

ThreadPoolExecutor的执行流程

线程池相关参数、调度流程学习

一个线程池中的线程异常了,那么线程池会怎么处理这个线程?

需要说明一下,文中讨论的线程池都是Executors线程池。

当一个线程池里面的线程异常后:
当执行方式是execute时,可以看到堆栈异常的输出。
当执行方式是submit时,堆栈异常没有输出。但是调用Future.get()方法时,可以捕获到异常。
不会影响线程池里面其他线程的正常执行。
线程池会把这个线程移除掉,并创建一个新的线程放到线程池中。
不要背答案,要理解,要深入,上面说完后记得在问问面试官,需要我从源码的角度讲一讲吗?这逼装的,礼貌而不失风度。

 

参考资料

线程池相关参数、调度流程学习

上一篇:PhpWind 8.7中禁止后台管理员随意修改会员用户名功能


下一篇:Win11系统中如何取消粘滞键的操作方法