Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command).
ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。
AbstractExecutorService:实现了ExecutorService和Executor接口的基本方法。
ThreadPoolExecuter:线程池
Executors:提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
ThreadFactory:负责生产线程的线程工厂。是一个接口类,而且方法只有一个。就是创建一个线程。为了统一在创建线程时设置一些参数,是否守护线程,优先级等。通过这个 TreadFactory 创建出来的线程能保证有相同的特性。
ThreadPoolExecutor 线程池参数
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
corePoolSize
线程池核心线程数,没有任务时,核心线程也会存活。设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。
maximumPoolSize
线程池最大数。当线程数>= corePoolSize ,且任务队列已满时。线程池会创建新线程来处理任务,当线程数= maximumPoolSize ,且任务队列已满时,线程池会拒绝处理任务而抛出异常。
keepAliveTime
空闲线程存活时间,当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize。如果 allowCoreThreadTimeout=true,则会直到线程数量=0。
unit
时间单位。
workQueue
线程池所使用的缓冲队列,当核心线程数达到最大时,新任务会放在队列中排队等待执行。
handler
线程池对拒绝任务的处理策略。
自建线程池
public class ThreadPoolExecutor_Demo {
public static void main(String[]args)throws InterruptedException{
ThreadPoolExecutor executorService = (ThreadPoolExecutor)CreateThreadPoolExecutor(2,8);
for (int i=0;i<8;i++){
executorService.execute(()->{
try {
TimeUnit.SECONDS.sleep(2);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" :开跑了!");
});
}
System.out.println("活跃的线程: "+executorService.getActiveCount());
System.out.println("核心线程: "+executorService.getCorePoolSize());
System.out.println("等待者:"+executorService.getQueue().size());
System.out.println("最大线程量: "+executorService.getMaximumPoolSize());
//线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。
// executorService.shutdownNow();
// 线程池拒接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。
TimeUnit.SECONDS.sleep(20);
executorService.shutdown();
// 这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。
// 一般情况下会和shutdown方法组合使用。
boolean boole = executorService.awaitTermination(3,TimeUnit.SECONDS);
System.out.println("线程池是否正在关闭: "+executorService.isShutdown());
System.out.println("线程池是否终止: "+executorService.isTerminated());
System.out.println("线程池是否正在终止: "+((ThreadPoolExecutor)executorService).isTerminating());
System.out.println("线程池是否关闭: "+boole);
}
private static ExecutorService CreateThreadPoolExecutor(int corePoolSize,int maxmumPoolSize){
ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,maxmumPoolSize,30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
r -> {Thread t = new Thread(r);
return t;
},new ThreadPoolExecutor.AbortPolicy());
System.out.println("这个 ThreadPoolExecutor 创建完成");
return executorService;
}
}
四大拒绝策略
public class ExecutorServicer_Policy {
public static void main(String[]args)throws InterruptedException{
// test_AbortPolicy();
// test_DiscardPolicy();
// test_DiscardOldestPolicy();
// test_CallerRunsPolicy();
}
private static void test_AbortPolicy() throws InterruptedException{
// AbortPolicy:为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
r -> {Thread t=new Thread(r);
return t;
},new ThreadPoolExecutor.AbortPolicy());
//多开几个
for (int i=0;i<8;i++){
try {
executorService.execute(()-> {
asleep(3);
System.out.println(Thread.currentThread().getName() + "开始了");
});
}
catch (Exception e){
System.out.println("线程量多了,报异常了。");
}
}
}
private static void test_DiscardPolicy() throws InterruptedException{
// DiscardPolicy:直接抛弃,任务不执行,空方法
ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
r -> {Thread t=new Thread(r);
return t;
},new ThreadPoolExecutor.DiscardPolicy());
for (int i=0;i<8;i++){
executorService.execute(()->{
asleep(5);
System.out.println(Thread.currentThread().getName()+"开始");
});
}
}
private static void test_DiscardOldestPolicy() throws InterruptedException{
//DiscardOldestPolicy:从队列里面抛弃 head 的一个任务,并再次execute 此task。
ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
r -> {Thread t=new Thread(r);
return t;
},new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i=0;i<8;i++){
executorService.execute(()->{
asleep(5);
System.out.println(Thread.currentThread().getName()+"开始");
});
}
}
private static void test_CallerRunsPolicy() throws InterruptedException{
//CallerRunsPolicy:在调用execute的线程里面执行此command,会阻塞入口
ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
r -> {Thread t=new Thread(r);
return t;
},new ThreadPoolExecutor.CallerRunsPolicy());
for (int i=0;i<8;i++){
executorService.execute(()->{
asleep(5);
System.out.println(Thread.currentThread().getName()+"开始");
});
}
}
public static void asleep(int i){
try {
TimeUnit.SECONDS.sleep(i);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}