从线程池拒绝策略中我们可以学到什么?

一、背景

很多人都知道或者用过线程池,线程池构造方法的参数中有一个参数为拒绝策略。


那么,通过拒绝策略我们可以学到哪些思想?


下面简单讲讲自己的理解。


二、可以学到什么?

2.1 为什么提供多种拒绝策略?

不知道你有没有思考过,为什么会有那么多编程语言?为什么会有那么多算法?为什么会有那么多设计模式?为什么会有那么多存储方式?为什么会有那么多线程池拒绝策略?


如果你稍微思考,就会发现不同的编程语言、算法、设计模式、存储方式、线程池拒绝策略等适用不同的场景。


(图片来源:美团技术)


启发:没有最好的选择,只有最适合的选择

我们要做的是在某个特定场景下选择最适合的技术,而不是最新、最热的技术。


2.2 如果我来设计拒绝策略,我能想到几个?

如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务。


如果由你来设计线程池经典的处理策略你会提供怎样的策略?


如果我们从来没学过这一块,可能就想不出这么多经典的处理策略。


JDK 源码提供的拒绝策略应该是经过深思熟虑,能够覆盖到常见业务场景。


启发:当我们面临大数据量处理时,也可以参考这些策略根据其适用的场景去灵活处理。


2.3 为什么默认策略是 AbortPolicy?

不知道你是否思考过,为什么默认的策略是 AbortPolicy?


如果默认是 DiscardPolicy 或者 DiscardOldestPolicy,真正触发拒绝策略时,使用者不容易感知。

使用者对源码理解不深入很容易出现不符合预期的运行效果。


显然超出“承载能力” 丢弃任务并抛出异常是一个更适合的选择。不提供服务,没啥好说的,已经超过能力了,没法提供服务。抛出异常为了让上游感知到并做出相应处理。


另外我们可以看下 Redis 缓存淘汰策略:


1.noeviction(默认策略):对于写请求不再提供服务,直接返回错误(DEL请求和部分特殊请求除外)

2.allkeys-lru:从所有key中使用LRU算法进行淘汰

3.volatile-lru:从设置了过期时间的key中使用LRU算法进行淘汰

4.allkeys-random:从所有key中随机淘汰数据

5.volatile-random:从设置了过期时间的key中随机淘汰

6.volatile-ttl:在设置了过期时间的key中,淘汰过期时间剩余最短的


我们会发现两者也有“惊人”的相似性,都是不提供服务,返回错误。


启发1:当我们遇到超过承载量的场景时,需要提供不同的选择,可以借鉴上述思想,并且默认需要考虑报错,保护自身的同事,让上游感知到。


启发2:可以通过对比经典技术对相似问题的处理来得到比较靠谱的方案 or 思考方向。


2.4 如果提供的策略无法满足需求怎么办?

作为一个严谨的设计者,就应该预料到自己提供的策略不可能涵盖所有场景。

因此需要支持用户自定义才行。


比如 Dubbo 中提供了 AbortPolicyWithReport 拒绝策略,在触发拒绝策略时可以报告一些信息。

public class AbortPolicyWithReport extends AbortPolicy {
    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
    private final String threadName;
    private final URL url;
    private static volatile long lastPrintTime = 0L;
    private static Semaphore guard = new Semaphore(1);
    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", this.threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), this.url.getProtocol(), this.url.getIp(), this.url.getPort());
        logger.warn(msg);
        this.dumpJStack();
        throw new RejectedExecutionException(msg);
    }
    private void dumpJStack() {
        long now = System.currentTimeMillis();
        if (now - lastPrintTime >= 600000L) {
            if (guard.tryAcquire()) {
                Executors.newSingleThreadExecutor().execute(new Runnable() {
                    public void run() {
                        String dumpPath = AbortPolicyWithReport.this.url.getParameter("dump.directory", System.getProperty("user.home"));
                        String OS = System.getProperty("os.name").toLowerCase();
                        SimpleDateFormat sdf;
                        if (OS.contains("win")) {
                            sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
                        } else {
                            sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
                        }
                        String dateStr = sdf.format(new Date());
                        FileOutputStream jstackStream = null;
                        try {
                            jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log." + dateStr));
                            JVMUtil.jstack(jstackStream);
                        } catch (Throwable var15) {
                            AbortPolicyWithReport.logger.error("dump jstack error", var15);
                        } finally {
                            AbortPolicyWithReport.guard.release();
                            if (jstackStream != null) {
                                try {
                                    jstackStream.flush();
                                    jstackStream.close();
                                } catch (IOException var14) {
                                }
                            }
                        }
                        AbortPolicyWithReport.lastPrintTime = System.currentTimeMillis();
                    }
                });
            }
        }
    }
}

PS: 这也是我们学习和理解 volatile、Semaphore 的一个机会。


再比如 Netty 中就提供了通过新建线程执行的策略:NewThreadRunsPolicy


   private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {

       public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

           try {

               final Thread t = new Thread(r, "Temporary task executor");

               t.start();

           } catch (Throwable e) {

               throw new RejectedExecutionException(

                       "Failed to start a new thread", e);

           }

       }

   }


这些开源项目的拒绝策略都为我们开了不少脑洞。


我们也可以根据具体业务,扔到消息队列里再消费等方式处理。


启发1:面向未来编程。

当我们设计一些通用工具时,也要留一些拓展性给别人。


启发1: 带着问题学技术或者把技术放在特定场景里学习,更有兴趣,也更容易掌握。


三、总结

本文简单谈下自己从线程池拒绝策略中学到的一点知识,希望能够对大家有启发。


希望大家在读源码时能多一些思考,多思考为什么,而不是记忆结论。


多问几个问题,如:


这是为了解决什么问题?

作者为什么这么设计? 这么设计的优点是什么?

这么设计的核心原理是什么(作者是怎么设计的) ?

如果是我会怎么设计?

别人是如何设计的?

我能从中学到什么,得到什么对未来技术成长有帮助的启发?

如果你觉得本文对你有帮助,欢迎点赞、收藏加关注三连!!


推荐:如果想深入了解如何更高效地阅读源码,可以看我的 GitChat 文章:《你真的知道该怎么读源码吗?》

上一篇:PL/SQL 集合的初始化与赋值


下一篇:Bootstrap<基础十三> 按钮组