线程池任务调度流程
线程池构造方法一览
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
相关参数解析
/** * * 核心线程数,如果没有设置allowCoreThreadTimeOut,线程一定存活(keep alive) * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * 最大线程数 (但实际的最大值取决于/受限于CAPACITY) * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize; /** * 等待任务的闲置线程的超时时间(纳秒) * 什么情况下使用? * 1.有超过corePoolSize的线程存在时; * 2.allowCoreThreadTimeOut的值为true即允许核心线程超时 * 其他情况下,线程会永远保持等待新任务的存活状态,即keep alive * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * keepAliveTime的时间单位 * the time unit for the {@code keepAliveTime} argument */ TimeUnit unit; /** * 用于持有任务并将任务切换给工作线程的队列 * 当workQueue未满而有新任务进来时,会先将新任务放进这个队列。 * 当workQueue已满且有新任务进来时,才会去创建大于corePoolSize而小于maximumPoolSize的这部分线程; * 当workQueue已满且线程数已达maximumPoolSize,再有新任务进来,就涉及到拒绝策略了。后面会讲到 * 看一下上面构造方法里的注释"This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method." * 它仅仅用来存放被execute方法提交的Runnable任务 * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ private final BlockingQueue<Runnable> workQueue; /** * 创建新线程的工厂 * Factory for new threads. All threads are created using this * factory (via method addWorker). All callers must be prepared * for addWorker to fail, which may reflect a system or user‘s * policy limiting the number of threads. Even though it is not * treated as an error, failure to create threads may result in * new tasks being rejected or existing ones remaining stuck in * the queue. * * We go further and preserve pool invariants even in the face of * errors such as OutOfMemoryError, that might be thrown while * trying to create threads. Such errors are rather common due to * the need to allocate a native stack in Thread.start, and users * will want to perform clean pool shutdown to clean up. There * will likely be enough memory available for the cleanup code to * complete without encountering yet another OutOfMemoryError. */ private volatile ThreadFactory threadFactory; /** * 拒绝策略。执行中遇到饱和或关闭情况调用的处理器 * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler;
拒绝策略拓展
/** * 1.中止策略。拒绝任务,抛出RejectedExecutionException,这也是默认拒绝策略 * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 从这里可以看到,抛RejectedExecutionException异常 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } /** * 默认拒绝策略就是AbortPolicy(中止策略,拒绝任务) * The default rejected execution handler */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** * 直接在调用execute方法的线程中运行被拒绝的任务 * 除非执行器已被关闭,这种情况下任务会被丢弃。 * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller‘s thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 从这里可以看到,如果线程池执行器未关闭,直接在调用者线程执行该任务 if (!e.isShutdown()) { r.run(); } } } /** * 丢弃最老的尚未处理的请求,然后执行新任务 * 除非执行器已被关闭,这种情况下任务会被丢弃 * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 如果线程池执行器未关闭,丢弃最老的任务,然后执行新任务 if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } /** * 静静地丢弃任务,不做任何处理 * 注:默认的AbortPolicy会抛RejectedExecutionException异常 * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 可以看到,此处啥也没做。 } }
- AbortPolicy : 直接抛异常处理,这是默认的拒绝策略
- DiscardPolicy : 直接抛弃(任务)不处理
- DiscardOldestPolicy : 丢弃队列中最老的任务
- CallerRunsPolicy : 将任务分配给当前执行execute方法的线程来处理
参数部分的补充
关于参数allowCoreThreadTimeOut:
/** * 这个参数跟corePoolSize相关 * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */ private volatile boolean allowCoreThreadTimeOut;
核心线程数怎么设置?有什么评估标准不?
假设机器有N个CPU,那么对于计算密集型的任务,应该设置线程数为N+1;对于IO密集型的任务,应该设置线程数为2N;对于同时有计算工作和IO工作的任务,应该考虑使用两个线程池,一个处理计算任务,一个处理IO任务,分别对两个线程池按照计算密集型和IO密集型来设置线程数。
CPU(计算)密集型举例:
我们可以把任务分为计算密集型和IO密集型。
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
线程池为何不允许使用Executors创建?
如上图,出自《Java开发手册嵩山版》
FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
CachedThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
ThreadPoolExecutor的执行流程
一个线程池中的线程异常了,那么线程池会怎么处理这个线程?
需要说明一下,文中讨论的线程池都是Executors线程池。
当一个线程池里面的线程异常后:
当执行方式是execute时,可以看到堆栈异常的输出。
当执行方式是submit时,堆栈异常没有输出。但是调用Future.get()方法时,可以捕获到异常。
不会影响线程池里面其他线程的正常执行。
线程池会把这个线程移除掉,并创建一个新的线程放到线程池中。
不要背答案,要理解,要深入,上面说完后记得在问问面试官,需要我从源码的角度讲一讲吗?这逼装的,礼貌而不失风度。