线程池小结1

Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command).
ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。
AbstractExecutorService:实现了ExecutorService和Executor接口的基本方法。
ThreadPoolExecuter:线程池
Executors:提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
ThreadFactory:负责生产线程的线程工厂。是一个接口类,而且方法只有一个。就是创建一个线程。为了统一在创建线程时设置一些参数,是否守护线程,优先级等。通过这个 TreadFactory 创建出来的线程能保证有相同的特性。

线程池小结1

线程池小结1

ThreadPoolExecutor 线程池参数

 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked

corePoolSize

线程池核心线程数,没有任务时,核心线程也会存活。设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。

maximumPoolSize

线程池最大数。当线程数>= corePoolSize ,且任务队列已满时。线程池会创建新线程来处理任务,当线程数= maximumPoolSize ,且任务队列已满时,线程池会拒绝处理任务而抛出异常。

keepAliveTime

空闲线程存活时间,当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize。如果 allowCoreThreadTimeout=true,则会直到线程数量=0。

unit

时间单位。

workQueue

线程池所使用的缓冲队列,当核心线程数达到最大时,新任务会放在队列中排队等待执行。

handler

线程池对拒绝任务的处理策略。

自建线程池

public class ThreadPoolExecutor_Demo {
    public static void main(String[]args)throws InterruptedException{
        ThreadPoolExecutor executorService = (ThreadPoolExecutor)CreateThreadPoolExecutor(2,8);
        for (int i=0;i<8;i++){
            executorService.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" :开跑了!");
            });
        }
        System.out.println("活跃的线程: "+executorService.getActiveCount());
        System.out.println("核心线程: "+executorService.getCorePoolSize());
        System.out.println("等待者:"+executorService.getQueue().size());
        System.out.println("最大线程量: "+executorService.getMaximumPoolSize());
        //线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。
        // executorService.shutdownNow();
        // 线程池拒接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。
        TimeUnit.SECONDS.sleep(20);
        executorService.shutdown();
        // 这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。
        // 一般情况下会和shutdown方法组合使用。
        boolean boole = executorService.awaitTermination(3,TimeUnit.SECONDS);
		System.out.println("线程池是否正在关闭: "+executorService.isShutdown());
        System.out.println("线程池是否终止: "+executorService.isTerminated());
        System.out.println("线程池是否正在终止: "+((ThreadPoolExecutor)executorService).isTerminating());
        System.out.println("线程池是否关闭: "+boole);
    }
    private static ExecutorService CreateThreadPoolExecutor(int corePoolSize,int maxmumPoolSize){
        ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,maxmumPoolSize,30,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
        r -> {Thread t = new Thread(r);
        return t;
        },new ThreadPoolExecutor.AbortPolicy());
        System.out.println("这个 ThreadPoolExecutor 创建完成");
        return executorService;
    }
}

四大拒绝策略

public class ExecutorServicer_Policy {
    public static void main(String[]args)throws InterruptedException{
       //  test_AbortPolicy();
       //  test_DiscardPolicy();
       //  test_DiscardOldestPolicy();
       //  test_CallerRunsPolicy();
    }
    private static void test_AbortPolicy() throws InterruptedException{
        // AbortPolicy:为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
        ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                r -> {Thread t=new Thread(r);
                    return t;
                },new ThreadPoolExecutor.AbortPolicy());
                //多开几个
                for (int i=0;i<8;i++){
                    try {
                        executorService.execute(()-> {
                            asleep(3);
                            System.out.println(Thread.currentThread().getName() + "开始了");
                        });
                    }
                    catch (Exception e){
                        System.out.println("线程量多了,报异常了。");
                    }
                }
        }
    private static void test_DiscardPolicy() throws InterruptedException{
        // DiscardPolicy:直接抛弃,任务不执行,空方法
        ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                r -> {Thread t=new Thread(r);
                    return t;
                },new ThreadPoolExecutor.DiscardPolicy());

        for (int i=0;i<8;i++){
            executorService.execute(()->{
                asleep(5);
                System.out.println(Thread.currentThread().getName()+"开始");
            });
        }
    }
    private static void test_DiscardOldestPolicy() throws InterruptedException{
        //DiscardOldestPolicy:从队列里面抛弃 head 的一个任务,并再次execute 此task。
        ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                r -> {Thread t=new Thread(r);
                    return t;
                },new ThreadPoolExecutor.DiscardOldestPolicy());
        for (int i=0;i<8;i++){
            executorService.execute(()->{
                asleep(5);
                System.out.println(Thread.currentThread().getName()+"开始");
            });
        }
    }
    private static void test_CallerRunsPolicy() throws InterruptedException{
        //CallerRunsPolicy:在调用execute的线程里面执行此command,会阻塞入口
        ExecutorService executorService = new ThreadPoolExecutor(1,2,5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                r -> {Thread t=new Thread(r);
                    return t;
                },new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i=0;i<8;i++){
            executorService.execute(()->{
                asleep(5);
                System.out.println(Thread.currentThread().getName()+"开始");
            });
        }
    }
    public static void asleep(int i){
        try {
            TimeUnit.SECONDS.sleep(i);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

 

上一篇:java – 如何将CompletableFuture.supplyAsync与PriorityBlockingQueue一起使用?


下一篇:java – 无法停止使用ExecutorService启动的任务