我们都知道创建一个线程可以继承Thread类或者实现Runnable接口,实际Thread类就是实现了Runnable接口。
到今天才明白后端线程的作用:我们可以开启线程去执行一些比较耗时的操作,类似于前台的ajax异步操作,比如说用户上传一个大的文件,我们可以获取到文件之后开启一个线程去操作该文件,但是可以提前将结果返回去,如果同步处理有可能太耗时,影响系统可用性。
1、new Thread的弊端
原生的开启线程执行异步任务的方式:
new Thread(new Runnable() { @Override
public void run() {
// TODO Auto-generated method stub
}
}).start();
弊端如下:
- 线程生命周期的开销非常高。创建线程都会需要时间,延迟处理的请求,并且需要JVM和操作系统提供一些辅助操作。
- 资源消耗。活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程将会闲置。大量空闲的线程会占用许多内存,给GC带来压力,而且大量线程在竞争CPU资源时会产生其他的性能开销。
- 稳定性。在可创建线程的数量上存在一个限制,这个限制受多个因素的制约,包括JVM的启动参数、Thread构造函数中请求栈的大小以及底层操作系统的限制。如果破坏了这些限制,很可能抛出 outOfMemoryError异常。
也就是说在一定的范围内增加线程的数量可以提高系统的吞吐率,但是如果超出了这个范围,再创建更多的线程只会降低程序的执行效率甚至导致系统的崩溃。
例如:
使用线程池:可以了解线程池的用法以及线程池的正确的关闭方法:shutdown之后马上调用awaitTermination阻塞等待实现同步关闭。
package cn.qlq.thread.twenty; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; public class Demo1 {
private static ExecutorService executorService = Executors.newFixedThreadPool(20);
private static volatile AtomicInteger atomicInteger = new AtomicInteger(0); public static void main(String[] args) {
long startTime = System.currentTimeMillis();
for (int i = 0; i < 2000; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
atomicInteger.incrementAndGet();
}
});
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() - startTime);
System.out.println(atomicInteger);
}
}
结果:
14
2000
package cn.qlq.thread.twenty; import java.util.concurrent.atomic.AtomicInteger; public class Demo2 {
private static volatile AtomicInteger atomicInteger = new AtomicInteger(0); public static void main(String[] args) {
long startTime = System.currentTimeMillis();
for (int i = 0; i < 2000; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
atomicInteger.incrementAndGet();
}
});
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(System.currentTimeMillis() - startTime);
System.out.println(atomicInteger);
}
}
结果:
257
2000
不使用线程池话费的时间比使用线程池长了好几倍,也看出了效率问题。
2.核心类结构如下:
1、Executor是一个*接口,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。
2、ExecutorService扩展了Executor。添加了一些用于生命周期管理的方法(同时还提供一些用于任务提交的便利方法
3、下面两个分支,AbstractExecutorService分支就是普通的线程池分支,ScheduledExecutorService是用来创建定时任务的。
3.Executor介绍
线程池简化了线程的管理工作。在Java类库中,任务执行的主要抽象不是Thread,而是Executor,如下:
public interface Executor {
void execute(Runnable command);
}
Executor是个简单的接口,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视机制。
Executor基于"生产者-消费者"模式,提交任务的操作相当于生产者,执行任务的则相当于消费者。
生命周期:
Executor的实现通常会创建线程来执行任务。但JVM只有在所有非守护线程全部终止才会退出。因此,如果无法正确的关闭Executor,那么JVM将无法结束。ExecutorService扩展了Executor接口,添加了一些用于生命周期管理的方法(同时还提供一些用于任务提交的便利方法
package java.util.concurrent;
import java.util.List;
import java.util.Collection;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction; public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService的生命周期有三种:运行、关闭和已终止。ExecutorService在创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完毕----包括还没开始的任务,这种属于正常关闭。shutdownNow方法将执行粗暴的关闭过程:它将取消所有运行中的任务,并且不再启动队列中尚未开始的任务,这种属于强行关闭(关闭当前正在执行的任务,然后返回所有尚未启动的任务清单)。
在ExecutorService关闭后提交的任务将由"拒绝执行处理器"来处理,它会抛弃任务,或者使得execute方法抛出一个RejectedExecutionException。等所有任务执行完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过isTerminated来轮询ExecutorService是否已经终止。通常在调用shutdown之后会立即调用awaitTermination阻塞等待,从而产生同步地关闭ExecutorService的效果。
4.线程池--ThreadPoolExecutor
线程池,从字面意义上看,是指管理一组同构工作线程的资源池。线程池是与工作队列(work queue)密切相关的,其中在工作队列保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务并执行任务,然后返回线程池等待下一个任务。(线程池启动初期线程不会启动,有任务提交(调用execute或submit)才会启动,直到到达最大数量就不再创建而是进入阻塞队列)。
"在线程池中执行任务"比"为每一个任务分配一个线程"优势更多。通过重用现有的线程而不是创建新线程,可以处理多个请求时分摊在创建线程和销毁过程中产生的巨大开销。另外一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于创建线程而延迟任务的执行,从而提高了性能。
ThreadPoolExecutor为Executor提供了一些基本实现。ThreadPoolExecutor是一个灵活的、稳定的线程池,允许各种允许机制。ThreadPoolExecutor定义了很多构造函数,最常见的是下面这个:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1、corePoolSize
核心池的大小。在创建了线程池之后,默认情况下,线程池中没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建了线程池之后,线程池钟的线程数为0,当有任务到来后就会创建一个线程去执行任务
2、maximumPoolSize
池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续创建线程以处理任务,maximumPoolSize表示的就是wordQueue满了,线程池中最多可以创建的线程数量
3、keepAliveTime
只有当线程池中的线程数大于corePoolSize时,这个参数才会起作用。当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间.
4、unit
keepAliveTime时间单位
5、workQueue
存储还没来得及执行的任务
6、threadFactory
执行程序创建新线程时使用的线程工厂
7、handler
由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序(拒绝执行处理器)
拒绝执行处理器实际上是定义了拒绝执行线程的行为:实际上也是一种饱和策略,当有界队列被填满后,饱和队列开始发挥作用。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
在类库中定义了四种实现:
1. AbortPolicy-终止策略
直接抛出一个RejectedExecutionException,也是JDK默认的拒绝策略
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2.CallerRunsPolicy-调运者运行策略
如果线程池没有被关闭,就尝试执行任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3.DiscardOldestPolicy-抛弃最旧的策略
如果线程池没有关闭,就移除队列中最先进入的任务,并且尝试执行任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
4. DiscardPolicy-抛弃策略
什么也不做,安静的丢弃任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
5.Executors
5.1ThreadFactory
在将这个之前先介绍一下ThreadFactory。每当线程池需要一个线程时,都是通过线程工厂创建的线程。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的线程信息。当然可以通过线程工厂定制线程的信息。此工厂也有好多实现:
public interface ThreadFactory { /**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
其实现类:
5.2Executors
可以通过Executors中的静态工厂方法之一创建一个线程池。Executors的静态工厂可以创建常用的四种线程池:
newFixedThreadPool(采用LinkedBlockingQueue队列--基于链表的阻塞队列)
创建一个定长线程池,每当提交一个任务时就创建一个线程,直到线程池的最大数量,这时线程池的规模将不再变化(如果由于某个线程由于发生了未预期的exception而结束,那么线程池会补充一个新的线程)。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool(使用SynchronousQueue同步队列)
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池的规模不受限。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newScheduledThreadPool(使用DelayedWorkQueue延迟队列)
创建一个定长线程池,支持定时及周期性任务执行。类似于Timer。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
newSingleThreadExecutor(采用LinkedBlockingQueue队列--基于链表的阻塞队列)
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束会创建一个新的线程来替代。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newFixedThreadPool和newCachedThreadPool这两个工厂方法返回通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的execotor。另外上面创建的时候都有一个可以指定线程工厂的方法:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
关于workqueue的选择: DelayQueue 可以实现有序加延迟的效果。 SynchronousQueue 同步队列,实际上它不是一个真正的队列,因为它不会维护队列中元素的存储空间,与其他队列不同的是,它维护一组线程,这些线程在等待把元素加入或移除队列。LinkedBlockingQueue 类似于LinkedList,基于链表的阻塞队列。此队列如果不指定容量大小,默认采用Integer.MAX_VALUE(可以理解为无限队列)。
关于队列的使用参考:https://www.cnblogs.com/qlqwjy/p/10175201.html
6.Java线程池的使用
下面所有的测试都是基于Myrunnale进行测试
package cn.qlq.thread.twenty; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class MyRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(MyRunnable.class); @Override
public void run() {
for (int i = 0; i < 5; i++) {
log.info("threadName -> {},i->{} ", Thread.currentThread().getName(), i);
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1.FixedThreadPool的用法
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。在创建的时候并不会马上创建2个线程,而是在提交任务的时候才创建线程。
创建方法:
/**
* 参数是初始化线程池子的大小
*/
private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);
查看源码:(使用了阻塞队列,超过池子容量的线程会在队列中等待)
测试代码:
package cn.qlq.thread.twenty; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Demo3 {
/**
* 参数是初始化线程池子的大小
*/
private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2); public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
batchTaskPool.execute(new MyRunnable());
}
}
}
结果:(执行完线程并没有销毁)
解释:
池子容量大小是2,所以前两个先被执行,第三个runable只是暂时的加到等待队列,前两个执行完成之后线程 pool-1-thread-1空闲之后从等待队列获取runnable进行执行。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
并且上面程序执行完毕之后JVM并没有结束,因此线程池创建的线程默认是非守护线程:
2.CachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
创建方法:
private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();
查看源码:(使用了同步队列)
测试代码:
package cn.qlq.thread.twenty; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Demo4 {
/**
* 参数是初始化线程池子的大小
*/
private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool(); public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
batchTaskPool.execute(new MyRunnable());
}
}
}
结果:
执行完成执行线程并没有结束
3.SingleThreadExecutor用法
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。类似于单线程执行的效果一样。
创建方法:
private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();
查看源码;使用的阻塞队列
测试代码:
package cn.qlq.thread.twenty; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Demo5 {
private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor(); public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
batchTaskPool.execute(new MyRunnable());
}
}
}
结果:
只有一个线程在执行任务:
4.ScheduledThreadPool用法------可以实现任务调度功能
创建一个定长线程池(会指定容量初始化大小),支持定时及周期性任务执行。可以实现一次性的执行延迟任务,也可以实现周期性的执行任务。
创建方法:
private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);
查看源码:(使用了延迟队列)
测试代码:
package cn.qlq.thread.twenty; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; public class Demo6 {
private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2); public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
// 第一次执行是在3s后执行(延迟任务)
batchTaskPool.schedule(new MyRunnable(), 3, TimeUnit.SECONDS);
// 第一个参数是需要执行的任务,第二个参数是第一次的延迟时间,第三个参数是两次执行的时间间隔,第四个参数是时间的单位
batchTaskPool.scheduleAtFixedRate(new MyRunnable(), 3, 7, TimeUnit.SECONDS);
// 第一个参数是需要执行的任务,第二个参数是第一次的延迟时间,第三个参数是两次执行的时间间隔,第四个参数是时间的单位
batchTaskPool.scheduleWithFixedDelay(new MyRunnable(), 3, 5, TimeUnit.SECONDS);
}
}
}
schedule是一次性的任务,可以指定延迟的时间。
scheduleAtFixedRate已固定的频率来执行某项计划(任务)
scheduleWithFixedDelay相对固定的延迟后,执行某项计划 (这个就是第一个任务执行完5s后再次执行,一般用这个方法任务调度)
如果延迟时间传入的是负数会立即执行,不会报非法参数错误。
关于二者的区别:
scheduleAtFixedRate :这个是按照固定的时间来执行,简单来说:到点执行
scheduleWithFixedDelay:这个呢,是等上一个任务结束后,在等固定的时间,然后执行。简单来说:执行完上一个任务后再执行
举例子
scheduledThreadPool.scheduleAtFixedRate(new TaskTest("执行调度任务3"),0, 1, TimeUnit.SECONDS); //这个就是每隔1秒,开启一个新线程
scheduledThreadPool.scheduleWithFixedDelay(new TaskTest("第四个"),0, 3, TimeUnit.SECONDS); //这个就是上一个任务执行完,3秒后开启一个新线程
补充:比如想要实现在某一个时钟定时晚上11点执行任务,并且每天都执行
long curDateSecneds = 0;
try {
String time = "21:00:00";
DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
curDateSecneds = curDate.getTime();
} catch (ParseException ignored) {
// ignored
} // 单位是s
long initialDelay = (curDateSecneds - System.currentTimeMillis()) / 1000;
int periodOneDaySeconds = 1 * 24 * 60 * 60;
batchTaskPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("111222");
}
}, initialDelay, periodOneDaySeconds, TimeUnit.SECONDS);
注意: 上面的单位也就是四个参数是延迟时间和间隔的单位,也就是说第四个参数决定第二个和第三个参数。
当然实现任务调度还可以采用quartz框架来实现,更加的灵活。参考:https://www.cnblogs.com/qlqwjy/p/8723358.html
5.测试创建线程池的时候传入一个线程工厂创建的线程是守护线程且自定义线程的name
package cn.qlq.thread.twenty; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger; public class Demo7 {
private static volatile AtomicInteger atomicInteger = new AtomicInteger();
/**
* 参数是初始化线程池子的大小
*/
private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("t" + atomicInteger.incrementAndGet());
thread.setDaemon(true);// 设置为守护线程
return thread;
}
}); public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
batchTaskPool.execute(new MyRunnable());
}
// 必须休眠。否则创建的是守护线程会直接关闭进程
Thread.sleep(20 * 1000);
}
}
结果:
补充:关于线程池中的线程销毁问题
线程池中的线程如果不调用shutdown()方法线程是不会销毁的,即使是方法内部的局部变量线程池也不会销毁;调用shutdown方法之后在所有线程执行完后会线程线程池中的线程。所以在使用线程池的时候如果是方法内部使用一定要shutdown销毁线程,如果是全局使用的静态线程池可以不shutdown。
例如:不调用shutdown方法不会销毁线程。调用shutdown会销毁线程。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class TestThreadPoolDestroy {
public static void main(String[] args) {
TestPoolDestroy();
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
TestPoolDestroy();
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} private static void TestPoolDestroy() {
ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
final CountDownLatch latch = new CountDownLatch(3);// 闭锁
for (int i = 0; i < 3; i++) {
batchTaskPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "进入run");
Thread.sleep(5 * 10000);
System.out.println(Thread.currentThread().getName() + "退出run");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
try {
latch.await();// 闭锁产生同步效果
System.out.println("三个都执行完毕");
// batchTaskPool.shutdown();// 调用此方法等待执行完毕会销毁线程,如果不调用此方法即使方法退出也不会销毁线程
System.out.println(batchTaskPool.isTerminated());
System.out.println(batchTaskPool.isShutdown());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
结果:
将上面的shutdown方法的注释去掉再次测试,结果如下: 调用shutdown之后线程会销毁
补充:如果想要判断线程池中的线程是否执行完毕,或者在多个线程在线程池中执行完毕之后处理某些事情可以结合闭锁来实现,参考:https://www.cnblogs.com/qlqwjy/p/10251610.html
(1)闭锁实现 (建议使用这种)
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class TestThreadPoolDestroy {
public static void main(String[] args) {
TestPoolDestroy();
System.out.println("main end");
} private static void TestPoolDestroy() {
ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
final CountDownLatch latch = new CountDownLatch(3);// 闭锁
for (int i = 0; i < 3; i++) {
batchTaskPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "进入run");
Thread.sleep(5 * 1000);
System.out.println(Thread.currentThread().getName() + "退出run");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
try {
latch.await();// 闭锁产生同步效果
System.out.println("三个都执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果:
pool-1-thread-1进入run
pool-1-thread-2进入run
pool-1-thread-3进入run
pool-1-thread-1退出run
pool-1-thread-3退出run
pool-1-thread-2退出run
三个都执行完毕
main end
(2)线程池自身携带的方法实现: shuwdown后立即调用awaitTermination 实现
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; public class TestThreadPoolDestroy {
public static void main(String[] args) {
TestPoolDestroy();
System.out.println("main end");
} private static void TestPoolDestroy() {
ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
batchTaskPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "进入run");
Thread.sleep(5 * 1000);
System.out.println(Thread.currentThread().getName() + "退出run");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
try {
batchTaskPool.shutdown();
batchTaskPool.awaitTermination(1, TimeUnit.DAYS);
System.out.println("三个都执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果:
pool-1-thread-2进入run
pool-1-thread-3进入run
pool-1-thread-1进入run
pool-1-thread-3退出run
pool-1-thread-2退出run
pool-1-thread-1退出run
三个都执行完毕
main end
补充:例如我系统中使用的一个ExcutorService的例子:
/**
* 同步钉钉组织结构和人员的Action
*
* @author Administrator
*
*/
@Namespace("/sync")
public class SyncGroupAndUserAndBaseInfoAction extends DMSActionSupport { /**
* serialID
*/
private static final long serialVersionUID = 3526083465788431949L; private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2); private static Logger logger = LoggerFactory.getLogger(SyncGroupAndUserAndBaseInfoAction.class); @Autowired
private GroupAndUserService groupService; @Autowired
private BaseInfoService baseInfoService; /**
* 同步基本信息的数据
*
* @return
*/
@Action(value = "syncGroupAndUser")
public String syncGroupAndUser() {
long startTime = System.currentTimeMillis();
logger.info("manual sync groups and users!"); String accessToken = FetchDataUtils.getAccessToken();
if (StringUtils.isBlank(accessToken)) {
setPreJs("accessToken is null!");
return "js";
} String groupStr = FetchDataUtils.getGroupStr(accessToken);
if (StringUtils.isBlank(groupStr)) {
setPreJs("groupStr is null");
return "js";
} Set<String> dingGroupIds = FetchDataUtils.getGroupIds(groupStr);// 钉钉同步回来的组
//新开一个线程去获取钉钉用户和组织
batchDisposeDingGroupAndUser(dingGroupIds,groupStr,accessToken); Map<String,Object> response = new HashMap<String,Object>();
response.put("success", true);
response.put("message", "success sync datas!");
setPreJs(APIUtils.getJsonResultFromMap(response)); long endTime = System.currentTimeMillis();
logger.info("同步钉钉组织结构和用户完成-----用时:{}ms",(endTime-startTime));
return "js";
} private void batchDisposeDingGroupAndUser(final Set<String> dingGroupIds, final String groupStr,final String accessToken) {
Runnable run = new Runnable() {
@Override
public void run() {
groupService.batchDisposeGroups(groupStr, dingGroupIds);
groupService.fetchAndDisposeUsers(accessToken, dingGroupIds);
}
};
batchTaskPool.execute(run);
} }
注意:
batchDisposeDingGroupAndUser()方法的形参必须声明为final,否则编译错误。 补充:阿里规约有一条
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
阻塞队列默认是Integer.MAX_VALUE(可以理解为无限队列)
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
} public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
补充:ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用
invokeAny是启动多个线程,相互独立的(无同步)去计算一个结果,当某一个线程得到结果之后,立刻终止所有线程,因为只需要一个结果就够了。
invokeAll是启动多个线程,相互独立的计算结果,当全部线程都结束之后统一返回结果,相当于阻塞执行。
例如:invokeAny的测试,也可以指定最长等待时间
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.StringUtils; public class PlainTest { private static ExecutorService es = Executors.newFixedThreadPool(5);
private static List<Callable<String>> list = new ArrayList<>(); public static void main(String[] a) throws InterruptedException, ExecutionException, TimeoutException {
for (int i = 0; i < 6; i++) {
final int nextInt = org.apache.commons.lang3.RandomUtils.nextInt(2, 4);
System.out.println(nextInt + "\t" + i); list.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(nextInt * 1000);
return nextInt + "";
}
});
} long start = System.currentTimeMillis();
String invokeAny = es.invokeAny(list);
long end = System.currentTimeMillis();
System.out.println("用时: " + (end - start) / 1000 + " s");
System.out.println("Result: " + invokeAny);
}
}
结果:(可以看到休眠时间最短的最先结束也就导致后面的线程结束)
3 0
3 1
2 2
3 3
3 4
2 5
用时: 2 s
Result: 2
invokeAll的测试,也可以指定最长等待时间
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.StringUtils; public class PlainTest { private static ExecutorService es = Executors.newFixedThreadPool(5);
private static List<Callable<String>> list = new ArrayList<>(); public static void main(String[] a) throws InterruptedException, ExecutionException, TimeoutException {
for (int i = 0; i < 6; i++) {
final int nextInt = org.apache.commons.lang3.RandomUtils.nextInt(2, 4);
System.out.println(nextInt + "\t" + i); list.add(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(nextInt * 1000);
return nextInt + "";
}
});
} List<String> result = new ArrayList<>(6); long start = System.currentTimeMillis();
List<Future<String>> invokeAll = es.invokeAll(list);
long end = System.currentTimeMillis();
System.out.println("用时: " + (end - start) / 1000 + " s");
for (Future<String> future : invokeAll) {
result.add(future.get());
}
System.out.println(StringUtils.join(result, ","));
}
}
结果:(可以看出,存在阻塞,用时4s)
2 0
3 1
2 2
3 3
2 4
2 5
用时: 4 s
2,3,2,3,2,2