Spring中使用@scheduled定时执行任务需要注意的坑

Spring使用@scheduled定时执行任务的时候是在一个单线程中,如果有多个任务,其中一个任务执行时间过长,则有可能会导致其他后续任务被阻塞直到前任务执行完成。也就是会造成一些任务无法定时执行的错觉。

解决方案:

通过自定义线程池,并对提交的线程任务做判断,如果上次提交的线程任务还没有执行,那么此次提交的线程任务就会被放弃,防止某一个定时任务执行速度慢,导致线程池中积累大量的同一个缓慢任务。

import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;

/**
 * @className: ScheduledTaskPool
 * @copyright: HTD
 * @description: 自定义线程池,用于定时任务单线程导致的一些定时任务不执行问题。
 * 线程池通过线程名,查询队列中是否包含同名的线程任务,如果任务在线程池中,就会放弃
 * 当前添加的线程任务。
 * @author: hubin
 * @date: 2019/4/26
 * @version: <version>
 */
public class ScheduledTaskPool extends ThreadPoolExecutor {
    private static  volatile  ScheduledTaskPool scheduledTaskPool;

 /* *初始化自定义线程池
 *核心线程数3
 * 最大线程数3
 * 空闲线程回收时间10分钟
 * 链表队列容量40
 * 线程池拒绝策略:
 * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
 * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
 * ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
 * @description:
 * @author hubin
 * @Param  []
 * @return  [返回类型说明]
 * @exception  [违例类型] [违例说明]
 * @version  [版本号, ]
 */
    private ScheduledTaskPool(){
        super(3,3,10,TimeUnit.MINUTES,new LinkedBlockingDeque<Runnable>(40),new ThreadPoolExecutor.DiscardPolicy());
    }

    /* *
     * @description:  单例模式获取线程池对象
     * @author hubin
     * @Param  []
     * @return webjob.pool.ScheduledTaskPool [返回类型说明]
     * @exception  [违例类型] [违例说明]
     * @version  [版本号, 2019/4/28]
     */
    private static ScheduledTaskPool getScheduledTaskPool() {
        if(scheduledTaskPool==null){
            synchronized (ScheduledTaskPool.class){
                if(scheduledTaskPool==null){
                    scheduledTaskPool=new ScheduledTaskPool();
                }
            }
        }
        return scheduledTaskPool;
    }

    /* *
     * @description:利用线程池执行线程
     * @author hubin
     * @Param  [thread 将被执行的线程, threadName 线程的名字不能为空]
     * @return void [返回类型说明]
     * @exception  [违例类型] [违例说明]
     * @version  [版本号, 2019/4/28]
     */
    public static void excuteThread(Thread thread,String threadName){
        if(threadName==null){
            throw new NullPointerException();
        }
        thread.setName(threadName);
        BlockingDeque queue= (BlockingDeque) ScheduledTaskPool.getScheduledTaskPool().getQueue();
        //线程池中不包含该该线程
        if(!isContains(thread.getName(),queue)){
            ScheduledTaskPool.getScheduledTaskPool().submit(thread);
        }else {
            System.out.println("线程池中已经含有该线程:"+threadName);
        }
    }

    /* *
     *根据线程名查询线程中是否包含该线程,包含返回true不包含返回false
     * @description:
     * @author hubin
     * @Param  [threadName, queue]
     * @return boolean [返回类型说明]
     * @exception  [违例类型] [违例说明]
     * @version  [版本号, 2019/4/28]
     */
    private static   boolean isContains(String threadName,BlockingDeque<Runnable> queue){
        boolean flag=false;
        Iterator<Runnable> itr=queue.iterator();
        try {
            while (itr.hasNext()){
                Runnable runnable=itr.next();
                Class clazz = runnable.getClass();
                Field callableField = clazz.getDeclaredField("callable");
                callableField.setAccessible(true);
                Object callable = callableField.get(runnable);
                Class clazz2 = callable.getClass();
                Field taskField = clazz2.getDeclaredField("task");
                taskField.setAccessible(true);
                Object task = taskField.get(callable);
                //判断包含同名的线程任务
                if(((Thread)task).getName().equalsIgnoreCase(threadName)){
                    flag=true;
                    break;
                }
            }
        } catch (Exception e) {
        }
        return flag;

    }



    public static void main(String[] args) {
        for(int i=0;i<100;i++){
            Thread taskThread=new Thread(){
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                    System.out.println("线程:"+this.getName()+"被执行");
                }
            };
            String threadName="test"+(int)(i*Math.random());
            ScheduledTaskPool.excuteThread(taskThread,threadName);
        }
    }

}

 

上一篇:target tile component.js load logic


下一篇:基于OGG 实现Oracle到Kafka增量数据实时同步