从线程池拒绝策略中我们可以学到什么?

一、背景

很多人都知道或者用过线程池,线程池构造方法的参数中有一个参数为拒绝策略。

那么,通过拒绝策略我们可以学到哪些思想?

下面简单讲讲自己的理解。

二、可以学到什么?

2.1 为什么提供多种拒绝策略?

不知道你有没有思考过,为什么会有那么多编程语言?为什么会有那么多算法?为什么会有那么多设计模式?为什么会有那么多存储方式?为什么会有那么多线程池拒绝策略?

如果你稍微思考,就会发现不同的编程语言、算法、设计模式、存储方式、线程池拒绝策略等适用不同的场景。
从线程池拒绝策略中我们可以学到什么?
(图片来源:美团技术

启发:没有最好的选择,只有最适合的选择
我们要做的是在某个特定场景下选择最适合的技术,而不是最新、最热的技术。

2.2 如果我来设计拒绝策略,我能想到几个?

如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务。

如果由你来设计线程池经典的处理策略你会提供怎样的策略?

如果我们从来没学过这一块,可能就想不出这么多经典的处理策略。

JDK 源码提供的拒绝策略应该是经过深思熟虑,能够覆盖到常见业务场景。

启发:当我们面临大数据量处理时,也可以参考这些策略根据其适用的场景去灵活处理。

2.3 为什么默认策略是 AbortPolicy?

不知道你是否思考过,为什么默认的策略是 AbortPolicy?

如果默认是 DiscardPolicy 或者 DiscardOldestPolicy,真正触发拒绝策略时,使用者不容易感知。
使用者对源码理解不深入很容易出现不符合预期的运行效果。

显然超出“承载能力” 丢弃任务并抛出异常是一个更适合的选择。不提供服务,没啥好说的,已经超过能力了,没法提供服务。抛出异常为了让上游感知到并做出相应处理。

另外我们可以看下 Redis 缓存淘汰策略:

1.noeviction(默认策略):对于写请求不再提供服务,直接返回错误(DEL请求和部分特殊请求除外)
2.allkeys-lru:从所有key中使用LRU算法进行淘汰
3.volatile-lru:从设置了过期时间的key中使用LRU算法进行淘汰
4.allkeys-random:从所有key中随机淘汰数据
5.volatile-random:从设置了过期时间的key中随机淘汰
6.volatile-ttl:在设置了过期时间的key中,淘汰过期时间剩余最短的

我们会发现两者也有“惊人”的相似性,都是不提供服务,返回错误。

启发1:当我们遇到超过承载量的场景时,需要提供不同的选择,可以借鉴上述思想,并且默认需要考虑报错,保护自身的同事,让上游感知到。

启发2:可以通过对比经典技术对相似问题的处理来得到比较靠谱的方案 or 思考方向。

2.4 如果提供的策略无法满足需求怎么办?

作为一个严谨的设计者,就应该预料到自己提供的策略不可能涵盖所有场景。
因此需要支持用户自定义才行。

比如 Dubbo 中提供了 AbortPolicyWithReport 拒绝策略,在触发拒绝策略时可以报告一些信息。

public class AbortPolicyWithReport extends AbortPolicy {
    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
    private final String threadName;
    private final URL url;
    private static volatile long lastPrintTime = 0L;
    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", this.threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), this.url.getProtocol(), this.url.getIp(), this.url.getPort());
        logger.warn(msg);
        this.dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {
        long now = System.currentTimeMillis();
        if (now - lastPrintTime >= 600000L) {
            if (guard.tryAcquire()) {
                Executors.newSingleThreadExecutor().execute(new Runnable() {
                    public void run() {
                        String dumpPath = AbortPolicyWithReport.this.url.getParameter("dump.directory", System.getProperty("user.home"));
                        String OS = System.getProperty("os.name").toLowerCase();
                        SimpleDateFormat sdf;
                        if (OS.contains("win")) {
                            sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
                        } else {
                            sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
                        }

                        String dateStr = sdf.format(new Date());
                        FileOutputStream jstackStream = null;

                        try {
                            jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log." + dateStr));
                            JVMUtil.jstack(jstackStream);
                        } catch (Throwable var15) {
                            AbortPolicyWithReport.logger.error("dump jstack error", var15);
                        } finally {
                            AbortPolicyWithReport.guard.release();
                            if (jstackStream != null) {
                                try {
                                    jstackStream.flush();
                                    jstackStream.close();
                                } catch (IOException var14) {
                                }
                            }

                        }

                        AbortPolicyWithReport.lastPrintTime = System.currentTimeMillis();
                    }
                });
            }
        }
    }
}

PS: 这也是我们学习和理解 volatile、Semaphore 的一个机会。

再比如 Netty 中就提供了通过新建线程执行的策略:NewThreadRunsPolicy

    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e);
            }
        }
    }

这些开源项目的拒绝策略都为我们开了不少脑洞。

我们也可以根据具体业务,扔到消息队列里再消费等方式处理。

启发1:面向未来编程。
当我们设计一些通用工具时,也要留一些拓展性给别人。

启发1: 带着问题学技术或者把技术放在特定场景里学习,更有兴趣,也更容易掌握。

三、总结

本文简单谈下自己从线程池拒绝策略中学到的一点知识,希望能够对大家有启发。

希望大家在读源码时能多一些思考,多思考为什么,而不是记忆结论。

多问几个问题,如:

  • 这是为了解决什么问题?
  • 作者为什么这么设计? 这么设计的优点是什么?
  • 这么设计的核心原理是什么(作者是怎么设计的) ?
  • 如果是我会怎么设计?
  • 别人是如何设计的?
  • 我能从中学到什么,得到什么对未来技术成长有帮助的启发?

如果你觉得本文对你有帮助,欢迎点赞、收藏加关注三连!!


推荐:如果想深入了解如何更高效地阅读源码,可以看我的 GitChat 文章:《你真的知道该怎么读源码吗?》

从线程池拒绝策略中我们可以学到什么?

上一篇:TensorFlow图像分类教程


下一篇:ffmpeg一些filter用法、以及一些功能命令