线程池

什么是线程池?


线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

为什么要用线程池?

Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有 T1,T3 的开销了。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

假设一个服务器一天要处理 50000 个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为 50000。

一般线程池大小是远小于 50000。所以利用线程池的服务器程序不会为了创建50000 而在处理请求时浪费时间,从而提高效率。 

ThreadPoolExecutor 的类关系

Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。

ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()、submit()的扩展,可以说是真正的线程池接口;

AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法:

    • ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
    • ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期执行"功能 ExecutorService;
    • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。
    • ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。

线程池的创建各个参数含义

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,longkeepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory
threadFactory,RejectedExecutionHandler handler)

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;

如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize

keepAliveTime

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于 corePoolSize 时才有用

TimeUnit

keepAliveTime 的时间单位

workQueue

workQueue 必须是 BlockingQueue 阻塞队列。当线程池中的线程数超过它的corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能

workQueue

用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用***队列作为工作队列会对线程池带来如下影响。

    • 1)当线程池中的线程数达到 corePoolSize 后,新任务将在***队列中等待,因此线程池中的线程数不会超过 corePoolSize。
    • 2)由于 1,使用***队列时 maximumPoolSize 将是一个无效参数。
    • 3)由于 1 和 2,使用***队列时 keepAliveTime 将是一个无效参数。
    • 4)更重要的,使用*** queue 可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。

所以我们一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue。

threadFactory

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加*的对线程做更多的设置,比如设置所有的线程为守护线程。

Executors 静态工厂里默认的 threadFactory,线程的命名规则是“pool-数字-thread-数字”。

参见以下代码:

import cn.enjoyedu.tools.SleepTools;import java.util.Random;import java.util.concurrent.*;/**
 *类说明:自定义线程池中线程的创建方式 */public class ThreadPoolAdv {    static class Worker implements Runnable
    {        private String taskName;        private Random r = new Random();        public Worker(String taskName){            this.taskName = taskName;
        }        public String getName() {            return taskName;
        }

        @Override        public void run(){
            System.out.println(Thread.currentThread().getName()                    +" process the task : " + taskName);            try {
                TimeUnit.MILLISECONDS.sleep(r.nextInt(100)*5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = new ThreadPoolExecutor(2, 4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue(10),                //TODO
                new ThreadPoolExecutor.DiscardOldestPolicy());        for (int i = 0; i <= 6; i++) {
            Worker worker = new Worker("worker " + i);
            System.out.println("A new task has been added : " + worker.getName());
            threadPool.execute(worker);
        }

    }
}

RejectedExecutionHandler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:

(1)AbortPolicy:直接抛出异常,默认策略;

(2)CallerRunsPolicy:用调用者所在的线程来执行任务;

(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

(4)DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

如何正确的定义线程池的大小?

定义线程池的大小目前没有特别规范的......

但是可以从任务特性方面考虑,进而合理的分配线程池的大小

    • 若CPU密集时间和IO密集型时间相差不大---推荐拆分
    • 若CPU密集时间(1s),IO密集型时间(10min),相差太大---不推荐拆分---未知---未测---不敢妄言
    • 涉及到磁盘、网络等
    • 处理计算或者逻辑封装数据
    • CPU的密集型:不超过(机器CPU核心数+1)
    • IO密集型:(机器CPU核心数*2)(推荐)  netty的线程池的就是(机器CPU核心数*2)    当然,若是(机器CPU核心数*2)时,还很空闲,可以继续(机器CPU核心数*2,*3,*4等)
    • 混合型(既有CPU的密集型,又有IO密集型)

  CPU核心数=Runtime.getRuntime().availableProcessors();

线程池使用步骤及代码示例

使用步骤

    • ①创建线程池对象
    • ②创建Runable或Callable实现类实例
    • ③提交Runable或Callable实现类实例
    • ④关闭线程池

代码示例

public ListpackageStrings(Listlist) throws InterruptedException, ExecutionException{        // 开始时间
        long start = System.currentTimeMillis();        // 创建一个大小等于CPU核心数的线程池
        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());        // 定义FutureTask任务集合
        try {
            List<Future> futures = new ArrayList<Future>();            for (final String str : list) {
                Futurefuture = exec.submit(new Callable() {
                    @Override                    public String call() throws Exception {                        // 封装处理数据
                        return packageString(str);
                    }
                });
                futures.add(future);
            }
            list.clear();            for (Future future : futures) {
                list.add(future.get());
            } 
        } finally {
            exec.shutdown();//关闭空闲线程            // exec.shutdownNow();//关闭所有线程        }        //System.out.println(list.toString());
        System.err.println("\n执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");        return list;
   }

线程池的一些常用方法:

 1.newCachedThreadPool

public static ExecutorService newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中;

2.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads):创建一个可重用的、具有固定线程数的线程池;

import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class Test {    public static void main(String[] args) {        //ExecutorService executorService = Executors.newSingleThreadExecutor();//线程池中只有一个线程
        ExecutorService executorService = Executors.newFixedThreadPool(2);//若该线程池中有两个线程
        executorService.submit(new TimeThread());
        executorService.submit(new CounterThread());
        executorService.shutdown();
    }
}
class CounterThread extends Thread{
    @Override    public void run() {        for(int i=1;i<4;i++) {
            System.out.println(i);            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
}class TimeThread extends Thread{
    @Override    public void run() {        for(int i=1;i<4;i++) {
            System.out.println(new Date());            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
}

相当于马厩里有两批马,此时刚好需要两匹马(只有两个线程处于就绪状态),两匹马一起被用,故线程交替输出

3.newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor():创建一个只有单线程的线程池,相当于调用newFixedThreadPool(int nThreads)方法时传入参数为1。

相当于马厩里有一批马,但是此时需要两匹马(有两个线程处于就绪状态),只有一个线程能用马,谁的优先权高谁获得CPU的资源执行程序,等到这个线程把所有的程序都执行完了才把马还回来,马厩里有空马,另一个线程才能用这匹马

4.newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。 corePoolSize指池中所保存的线程数,即使线程时空闲的也被保存在线程池内。

Ps:此方法创建的线程池会吞异常:若是在线程内不try-catry则不会抛异常

5.newSingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。

Ps:此方法创建的线程池会吞异常:若是在线程内不try-catry则不会抛异常

6.newWorkStealingPool

public static ExecutorService newWorkStealingPool(int parallelism):创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争。

7.newWorkStealingPool

public static ExecutorService newWorkStealingPool():该方法是前一个方法的简化版本。

关于Executor及线程池图文逻辑

线程池的工作原理

线程池

Executor框架

线程池

Executor框架基本使用流程

 线程池

附:CompletionService 先到先得 谁先执行完,我就可以先拿到谁的执行结果

上一篇:Java线程池


下一篇:使用ExecutorService实现线程池