Java并发编程系列之四:自定义线程池拒绝策略

引言

ThreadPoolExcutorJDK自带的线程池,也是我们在创建线程池时经常用到的创建方法。对线程池稍微有了解的同学都知道,线程池是一种典型的池化缓存设计。JDK自带了四种任务拒绝策略,但是有时候是不能满足我们实际的业务需求的,所以此时我们需要自定义拒绝策略,来处理被线程池拒绝的任务。

  • 自带线程池拒绝策略介绍
  • 如何自定义拒绝策略

一、自带线程池拒绝策略介绍

JDK自带的线程池拒绝策略有如下四种:

1、 DiscardPolicy: 默默丢弃无法处理的任务,不予任何处理;

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
}

2、DiscardOldestPolicy: 丢弃队列中最老的任务, 尝试再次提交当前任务;

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    
}

3、 AbortPolicy: 直接抛异常,阻止系统正常工作;

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

}

4、CallerRunsPolicy: 将任务分给调用线程来执行,运行当前被丢弃的任务,这样做不会真的丢弃任务,但是提交的线程性能有可能急剧下降

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
}

我们可以看得出来,前三种策略都是会丢弃原有的任务。但是在某些业务场景下,我们不能够粗暴的丢弃任务。第四种拒绝策略,是通过启动线程池的线程来处理丢弃的任务,但是问题是即便是线程池空闲,它也不会执行丢弃的任务,而是等待调用线程池的主线程来执行任务,直到任务结束。

二、自带线程池拒绝策略介绍

在线程池的定义中我们可以看到拒绝策略有个统一的实现接口,如下:

public interface RejectedExecutionHandler {
     void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

我们可以根据自己的业务需求来定义符合自己业务场景的处理策略。我们可以看下一些主流框架是如何自定义自己的处理策略的。

1、Netty 中的线程池拒绝策略

 private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        NewThreadRunsPolicy() {
            super();
        }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e);
            }
        }
    }

从上面的源码可以看出,Netty的处理方式就是不丢弃任务,这个思想和CallerRunsPolicy优点类似。只是在Netty框架中的自定义拒绝策略中,是通过新建工作线程来完成被丢弃的任务的,但是我们看一看得出它在创建线程时,没有进行条件约束,只要资源允许就不断创建新的线程来进行处理。


2、Dubbo 中的线程池拒绝策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {
       //省略实现
    }
}

Dubbo中的自定义拒绝策略中,打印了日志,输出当前线程的堆栈信息以及执行JDK的默认拒绝策略。

上一篇:MySQL添加字段和修改字段的方法


下一篇:Java多线程的调度策略