多线程 JDK线程池详解

线程池详解

JDK类图

多线程 JDK线程池详解

Executor , ExecutorService , Executors 区别

Executor ExecutorService
Executor 是 Java 线程池的核心接口,用来并发执行提交的任务 ExecutorService 是 Executor 接口的扩展,提供了异步执行和关闭线程池的方法
提供execute()方法用来提交任务 提供submit()方法用来提交任务
execute()方法无返回值 submit()方法返回Future对象,可用来获取任务执行结果
不能取消任务 可以通过Future.cancel()取消pending中的任务
没有提供和关闭线程池有关的方法 提供了关闭线程池的方法
Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程

JDK线程池执行逻辑

多线程 JDK线程池详解
在实际开发中,我们最多使用的是ThreadPoolExecutor 自定义线程池,下面就其参数最多的构造方法简单说明:

public ThreadPoolExecutor(int corePoolSize,  
                              int maximumPoolSize,  
                              long keepAliveTime,  
                              TimeUnit unit,  
                              BlockingQueue<Runnable> workQueue,  
                              ThreadFactory threadFactory,  
                              RejectedExecutionHandler handler) {  
        if (corePoolSize < 0 ||  
            maximumPoolSize <= 0 ||  
            maximumPoolSize < corePoolSize ||  
            keepAliveTime < 0)  
            throw new IllegalArgumentException();  
        if (workQueue == null || threadFactory == null || handler == null)  
            throw new NullPointerException();  
        this.corePoolSize = corePoolSize;  
        this.maximumPoolSize = maximumPoolSize;  
        this.workQueue = workQueue;  
        this.keepAliveTime = unit.toNanos(keepAliveTime);  
        this.threadFactory = threadFactory;  
        this.handler = handler;  
    }  

corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime时间单位
workQueue 阻塞任务队列
threadFactory 新建线程工厂
RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
多线程 JDK线程池详解

Executors 线程池配置方案及风险

Executors.newSingleThreadExecutor();//创建一个单核心的线程池
Executors.newScheduledThreadPool(10);//创建一个按照计划规定执行的线程池
Executors.newCachedThreadPool();//创建一个自动增长的线程池
Executors.newFixedThreadPool(4);//创建一个具有固定线程数的线程池
Executors.newWorkStealingPool();//创建一个具有抢占式操作的线程池

《阿里巴巴 Java开发手册》1.6并发处理

第3条规定:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
第4条规定:线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能让编写代码的攻城狮更加明确线程池的运行规则,规避资源耗尽(OOM)的风险

之所以会出现这样的规范,是因为jdk已经封装好的线程池存在潜在风险:

  • FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE ,会堆积大量请求OOM
  • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量线程OOM

所以从系统安全角度出发,原则上都应该自己手动创建线程池
Executors 部分源码
1)SingleThreadExecutor线程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

可以看到使用的是ThreadPoolExecutor,并且将工作队列LinkedBlockingQueue,而LinkedBlockingQueue是*队列。
LinkedBlockingQueue是其实也算是一个有界队列,只是队列的长度为Integer.MAX_VALUE
因此SingleThreadExecutor线程池使用了*队列,当持续产生任务,来不及执行的时候,会不停的放入到工作队列(阻塞队列),由于*,最终会导致OOM,因此不推荐

2)ScheduledThreadPool线程池

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

其实DelayedWorkQueue采用的是数组实现,初始容量16,因此是一个有界队列。
虽然DelayedWorkQueue是有界队列,但由于最大线程数量设置成Integer.MAX_VALUE,CachedThreadPool线程池,可能因为阻塞队列满了,然后开启最大线程执行任务,而最大线程数量为Integer.MAX_VALUE,也可能会导致OOM异常。

3)CachedThreadPool线程池

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

与ScheduledThreadPool线程池一样,SynchronousQueue虽然不是*队列(SynchronousQueue内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素),但最大线程池数量设置成Integer.MAX_VALUE,可能会导致OOM异常

4)FixedThreadPool线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

与SingleThreadExecutor线程池一样,LinkedBlockingQueue是*队列,很容易出现OOM异常

JDK线程池拒绝策略

常见的几种拒绝策略

所有拒绝策略都实现了接口 RejectedExecutionHandler

public interface RejectedExecutionHandler {

    /**
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

这个接口只有一个 rejectedExecution 方法。
r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。
1)AbortPolicy
此为线程池默认的拒绝策略,直接抛出拒绝异常(继承自RuntimeException),会中断调用者的处理过程,所以除非有明确需求,一般不推荐

    public static class AbortPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

2)CallerRunsPolicy
在调用者线程中,运行当前被丢弃的任务。
只会用调用者所在线程来运行任务,也就是说任务不会进入线程池。
如果线程池已经被关闭,则直接丢弃该任务。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

3)DiscardOledestPolicy
丢弃队列中最老的,然后再次尝试提交新任务

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

这里 e.getQueue() 是获得待执行的任务队列,也就是前面提到的待业队列。
因为是队列,所以先进先出,一个poll()方法就能直接把队列中最老的抛弃掉,再次尝试执行execute®。

4)DiscardPolicy
丢弃无法加载的任务

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

自定义拒绝策略

场景:如果我们在执行异步任务时,当线程池队列已满且达到最大线程数时,就会触发线程池的拒绝策略(AbortPolicy);此时被拒绝的任务就会无法被执行,影响业务结果;此时就需要自定义线程池拒绝策略,如果将任务冲入加入队列,待线程池有空闲资源时可再执行
demo:

public class CustomThreadPoolExecutor {

	private ThreadPoolExecutor pool = null;

	/**
	 * 线程池初始化方法
	 * 
	 * corePoolSize 核心线程池大小----1 maximumPoolSize 最大线程池大小----3 keepAliveTime
	 * 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit TimeUnit
	 * keepAliveTime时间单位----TimeUnit.MINUTES workQueue 阻塞队列----new
	 * ArrayBlockingQueue<Runnable>(5)====5容量的阻塞队列 threadFactory 新建线程工厂----new
	 * CustomThreadFactory()====定制的线程工厂 rejectedExecutionHandler
	 * 当提交任务数超过maxmumPoolSize+workQueue之和时,
	 * 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)), 任务会交给RejectedExecutionHandler来处理
	 */

	public void init() {
		pool = new ThreadPoolExecutor(10, 30, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10),
				new CustomThreadFactory(), new CustomRejectedExecutionHandler());
	}

	public void destory() {
		if (pool != null) {
			pool.shutdownNow();
		}
	}

	public ExecutorService getCustomThreadPoolExecutor() {
		return this.pool;
	}

	private class CustomThreadFactory implements ThreadFactory {

		private AtomicInteger count = new AtomicInteger(0);

		@Override
		public Thread newThread(Runnable r) {
			Thread t = new Thread(r);
			String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
			System.out.println(threadName);
			t.setName(threadName);
			return t;
		}
	}

	private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

		@Override
		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
			// 记录异常
			// 报警处理等
//			try {
//				executor.getQueue().put(r);
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
			System.out.println("error.............");
		}
	}

	// 测试构造的线程池
	public static void main(String[] args) {
		CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
		// 1.初始化
		exec.init();

		ExecutorService pool = exec.getCustomThreadPoolExecutor();
		for (int i = 1; i < 100; i++) {
			System.out.println("提交第" + i + "个任务!");
			pool.execute(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(3000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("running=====");
				}
			});
		}

		// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
		// exec.destory();

		try {
			Thread.sleep(10000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

注意:41以后提交的任务就不能正常处理了,因为,execute中提交到任务队列是用的offer方法,如上面代码,这个方法是非阻塞的,所以就会交给CustomRejectedExecutionHandler
来处理,所以对于大数据量的任务来说,这种线程池,如果不设置队列长度会OOM,设置队列长度,会有任务得不到处理,接下来我们构建一个阻塞的自定义线程池

查看ThreadPoolExecutor 源码execute 方法实现

所以解决此问题,可以自定义拒绝策略
多线程 JDK线程池详解

当提交任务被拒绝时,进入拒绝机制,我们实现拒绝方法,把任务重新用阻塞提交方法put提交,实现阻塞提交任务功能,防止队列过大,OOM,提交被拒绝方法在下面
总结:
1、用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用*队列,如果任务量非常大,要用有界队列,防止OOM
2、如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务
3、最大线程数一般设为2N+1最好,N是CPU核数
4、核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数
5、如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果

上一篇:Java线程池实现原理及其在美团业务中的实践


下一篇:ThreadPoolExecutor线程池详解