很多开发者谈到Java多线程开发,仅仅停留在new Thread(...).start()或直接使用Executor框架这个层面,对于线程的管理和控制却不够深入,通过读《Java并发编程实践》了解到了很多不为我知但又非常重要的细节,今日整理如下。
不应用线程池的缺点
有些开发者图省事,遇到需要多线程处理的地方,直接new Thread(...).start(),对于一般场景是没问题的,但如果是在并发请求很高的情况下,就会有些隐患:
· 新建线程的开销。线程虽然比进程要轻量许多,但对于JVM来说,新建一个线程的代价还是挺大的,决不同于新建一个对象
· 资源消耗量。没有一个池来限制线程的数量,会导致线程的数量直接取决于应用的并发量,这样有潜在的线程数据巨大的可能,那么资源消耗量将是巨大的
· 稳定性。当线程数量超过系统资源所能承受的程度,稳定性就会成问题
制定执行策略
在每个需要多线程处理的地方,不管并发量有多大,需要考虑线程的执行策略
· 任务以什么顺序执行
· 可以有多少个任何并发执行
· 可以有多少个任务进入等待执行队列
· 系统过载的时候,应该放弃哪些任务?如何通知到应用程序?
· 一个任务的执行前后应该做什么处理
线程池的类型
不管是通过Executors创建线程池,还是通过Spring来管理,都得清楚知道有哪几种线程池:
· FixedThreadPool:定长线程池,提交任务时创建线程,直到池的最大容量,如果有线程非预期结束,会补充新线程
· CachedThreadPool:可变线程池,它犹如一个弹簧,如果没有任务需求时,它回收空闲线程,如果需求增加,则按需增加线程,不对池的大小做限制
· SingleThreadExecutor:单线程。处理不过来的任务会进入FIFO队列等待执行
· SecheduledThreadPool:周期性线程池。支持执行周期性线程任务
其实,这些不同类型的线程池都是通过构建一个ThreadPoolExecutor来完成的,所不同的是 corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory这 么几个参数。具体可以参见JDK DOC。
线程池饱和策略
由以上线程池类型可知,除了CachedThreadPool其他线程池都有饱和的可能,当饱和以后就需要相应的策略处理请求线程的任 务,ThreadPoolExecutor采取的方式通过队列来存储这些任务,当然会根据池类型不同选择不同的队列,比如FixedThreadPool 和SingleThreadExecutor默认采用的是无限长度的LinkedBlockingQueue。但从系统可控性讲,最好的做法是使用定长的 ArrayBlockingQueue或有限的LinkedBlockingQueue,并且当达到上限时通过 ThreadPoolExecutor.setRejectedExecutionHandler方法设置一个拒绝任务的策略,JDK提供了 AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy几种策略,具体差异可 见JDK DOC
线程无依赖性
多线程任务设计上尽量使得各任务是独立无依赖的,所谓依赖性可两个方面:
· 线程之间的依赖性。如果线程有依赖可能会造成死锁或饥饿
· 调用者与线程的依赖性。调用者得监视线程的完成情况,影响可并发量
当然,在有些业务里确实需要一定的依赖性,比如调用者需要得到线程完成后结果,传统的Thread是不便完成的,因为run方法无返回值,只能通过 一些共享的变量来传递结果,但在Executor框架里可以通过Future和Callable实现需要有返回值的任务,当然线程的异步性导致需要有相应 机制来保证调用者能等待任务完成,关于Future和Callable的用法见下面的实例就一目了然了:
1. public class FutureRenderer {
2. private final ExecutorService executor = ...;
3. void renderPage(CharSequence source) {
4. final List<ImageInfo> imageInfos = scanForImageInfo(source);
5. Callable<List<ImageData>> task =
6. new Callable<List<ImageData>>() {
7. public List<ImageData> call() {
8. List<ImageData> result
9. = new ArrayList<ImageData>();
10. for (ImageInfo imageInfo : imageInfos)
11. result.add(imageInfo.downloadImage());
12. return result;
13. }
14. };
15. Future<List<ImageData>> future = executor.submit(task);
16. renderText(source);
17. try {
18. List<ImageData> imageData = future.get();
19. for (ImageData data : imageData)
20. renderImage(data);
21. } catch (InterruptedException e) {
22. // Re-assert the thread's interrupted status
23. Thread.currentThread().interrupt();
24. // We don't need the result, so cancel the task too
25. future.cancel(true);
26. } catch (ExecutionException e) {
27. throw launderThrowable(e.getCause());
28. }
29. }
30. }
以上代码关键在于List<ImageData> imageData = future.get();如果Callable类型的任务没有执行完时,调用者会阻塞等待。不过这样的方式还是得谨慎使用,很容易造成不良设计。另外对于这种需要等待的场景,就需要设置一个最大容忍时间timeout,设置方法可以在 future.get()加上timeout参数,或是再调用ExecutorService.invokeAll 加上timeout参数
线程的取消与关闭
一般的情况下是让线程运行完成后自行关闭,但有些时候也会中途取消或关闭线程,比如以下情况:
· 调用者强制取消。比如一个长时间运行的任务,用户点击"cancel"按钮强行取消
· 限时任务
· 发生不可处理的任务
· 整个应用程序或服务的关闭
因此需要有相应的取消或关闭的方法和策略来控制线程,一般有以下方法:
1)通过变量标识来控制
这种方式比较老土,但使用得非常广泛,主要缺点是对有阻塞的操作控制不好,代码示例如下所示:
1. public class PrimeGenerator implements Runnable {
2. @GuardedBy("this")
3. private final List<BigInteger> primes
4. = new ArrayList<BigInteger>();
5. private volatile boolean cancelled;
6. public void run() {
7. BigInteger p = BigInteger.ONE;
8. while (!cancelled ) {
9. p = p.nextProbablePrime();
10. synchronized (this) {
11. primes.add(p);
12. }
13. }
14. }
15. public void cancel() { cancelled = true; }
16. public synchronized List<BigInteger> get() {
17. return new ArrayList<BigInteger>(primes);
18. }
19. }
2)中断
中断通常是实现取消最明智的选择,但线程自身需要支持中断处理,并且要处理好中断策略,一般响应中断的方式有两种:
· 处理完中断清理后继续传递中断异常(InterruptedException)
· 调用interrupt方法,使得上层能感知到中断异常
3) 取消不可中断阻塞
存在一些不可中断的阻塞,比如:
· java.io和java.nio中同步读写IO
· Selector的异步IO
· 获取锁
对于这些线程的取消,则需要特定情况特定对待,比如对于socket阻塞,如果要安全取消,则需要调用socket.close()
4)JVM的关闭
如果有任务需要在JVM关闭之前做一些清理工作,而不是被JVM强硬关闭掉,可以使用 JVM的钩子技术,其实JVM钩子也只是个很普通的技术,也就是用个map把一些需要JVM关闭前启动的任务保存下来,在JVM关闭过程中的某个环节来并 发启动这些任务线程。具体使用示例如下:
1. public void start() {
2. Runtime.getRuntime().addShutdownHook(new Thread() {
3. public void run() {
4. try { LogService.this.stop(); }
5. catch (InterruptedException ignored) {}
6. }
7. });