【Java多线程】线程池学习

Java线程池学习

众所周知,Java不仅提供了线程,也提供了线程池库给我们使用,那么今天来学学线程池的具体使用以及线程池基本实现原理分析。


ThreadPoolExecutor

ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

参数解释如下:

  • corePoolSize : 核心池大小,即线程的数量
  • maximumPoolSize : 线程池最大数量
  • keepAliveTime : 线程池中的线程的保活时间,超时后线程会退出直到线程池中的线程数量等于corePoolSize,如果allowCoreThreadTimeout为true,线程数量会降为0
  • workQueue:线程池采用的缓冲队列,常用的有:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
  • handler:当线程池中线程数量达到maximumPoolSize时,仍有任务需要创建线程来完成,则handler采取相应的策略,默认的ThreadPoolExecutor.AbortPolicy(),即会抛弃该任务。

线程池中可能的几种状态:

  • 当线程数量小于corePoolSize时,任务来时会创建新的线程来处理,并把该线程加入线程队列中(实际上是一个hashset)(此步骤需要获取全局锁,ReentryLock)

  • 如果当前线程数量达到了corePoolSize,任务来时将任务加入BlockingQueue

  • 如果任务列队满了无法加入新的任务时,会创建新的线程(同样需要获取全局锁)

  • 如果线程池数量达到maximumPoolSize,并且任务队列已满,新的任务将被拒绝

注意:获取全局锁是一个非常影响性能的因素,所以线程池会尽量执行第二步,因为此步骤不需要获取全局锁。

各种任务队列(BlockingQueue)的区别

  • ArrayBlockingQueue: 基于数组的有界阻塞队列,FIFO

  • LinkedBlockingQueue:基于链表的*阻塞队列,FIFO,吞吐量高于ArrayBlockingQueue,Executors.newFixedThreadPool()使用了该队列

  • SynchronousQueue:一个不存储元素的队列,一进一出吞吐量高于LinkedBlockingQueue,Executors.newCachedThreadPool()使用了该队列

  • PriorityBlockingQueue:阻塞优先队列

RejectedExecutionHandler饱和拒绝策略

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:只用调用者所在线程来运行任务
  • DiscardOldestPolicy:丢弃队列里最近的任务,并执行当前任务
  • DiscardPolicy:不处理,不丢弃

提交一个任务

两种方式:

threadPool.submit(new Runnable() {
@Override
public void run() {
//
}
});

上面是第一种,通过submit方法提交,该方法有返回值,返回类型是Future<?>类型,

threadPool.execute(new Runnable() {
@Override
public void run() {
//
}
});

上面是第二种,通过execute执行,无返回值。

关闭线程池

shutdown()和showdownNow()两种方法,都是调用了interruptIdleWorkers()方法去遍历线程池中的工作线程,然后去打断它们,代码如下:

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

不同的是shutdown是把线程的状态置为SHUTDOWN而shutdownNow是STOP。

调优

  • 尽量使用有界队列
  • 线程池中的线程数量应该根据场景来合理配置,例如CPU密集型的任务就应该配置尽量少的线程数量

Executors工具类

主要介绍工具类Executors创建的三种线程池:

ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService1 = Executors.newFixedThreadPool(10);
ExecutorService executorService2 = Executors.newCachedThreadPool();

FixedThreadPool

源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

可以看出core和max数量是一样的,因为使用的LinkedBlockingQueue是*阻塞队列,因此达到coreSize后不会继续创建线程而是阻塞在队列那了。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

只创建一个线程的线程池。

CachedThreadPool

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

使用了SynchronousQueue这个没有容量的阻塞队列,线程池的coreSize为0,maxSize为无限大,所以当生产者提供任务的速度大于消费者处理任务的时候,可能会无线创建线程,从而导致系统资源(CPU,内存等)耗竭。

上一篇:DocumentEvent事件的应用


下一篇:nginx配置文件内容详情及基本属性配置