Spring使用@scheduled定时执行任务的时候是在一个单线程中,如果有多个任务,其中一个任务执行时间过长,则有可能会导致其他后续任务被阻塞直到前任务执行完成。也就是会造成一些任务无法定时执行的错觉。
解决方案:
通过自定义线程池,并对提交的线程任务做判断,如果上次提交的线程任务还没有执行,那么此次提交的线程任务就会被放弃,防止某一个定时任务执行速度慢,导致线程池中积累大量的同一个缓慢任务。
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
/**
* @className: ScheduledTaskPool
* @copyright: HTD
* @description: 自定义线程池,用于定时任务单线程导致的一些定时任务不执行问题。
* 线程池通过线程名,查询队列中是否包含同名的线程任务,如果任务在线程池中,就会放弃
* 当前添加的线程任务。
* @author: hubin
* @date: 2019/4/26
* @version: <version>
*/
public class ScheduledTaskPool extends ThreadPoolExecutor {
private static volatile ScheduledTaskPool scheduledTaskPool;
/* *初始化自定义线程池
*核心线程数3
* 最大线程数3
* 空闲线程回收时间10分钟
* 链表队列容量40
* 线程池拒绝策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
* @description:
* @author hubin
* @Param []
* @return [返回类型说明]
* @exception [违例类型] [违例说明]
* @version [版本号, ]
*/
private ScheduledTaskPool(){
super(3,3,10,TimeUnit.MINUTES,new LinkedBlockingDeque<Runnable>(40),new ThreadPoolExecutor.DiscardPolicy());
}
/* *
* @description: 单例模式获取线程池对象
* @author hubin
* @Param []
* @return webjob.pool.ScheduledTaskPool [返回类型说明]
* @exception [违例类型] [违例说明]
* @version [版本号, 2019/4/28]
*/
private static ScheduledTaskPool getScheduledTaskPool() {
if(scheduledTaskPool==null){
synchronized (ScheduledTaskPool.class){
if(scheduledTaskPool==null){
scheduledTaskPool=new ScheduledTaskPool();
}
}
}
return scheduledTaskPool;
}
/* *
* @description:利用线程池执行线程
* @author hubin
* @Param [thread 将被执行的线程, threadName 线程的名字不能为空]
* @return void [返回类型说明]
* @exception [违例类型] [违例说明]
* @version [版本号, 2019/4/28]
*/
public static void excuteThread(Thread thread,String threadName){
if(threadName==null){
throw new NullPointerException();
}
thread.setName(threadName);
BlockingDeque queue= (BlockingDeque) ScheduledTaskPool.getScheduledTaskPool().getQueue();
//线程池中不包含该该线程
if(!isContains(thread.getName(),queue)){
ScheduledTaskPool.getScheduledTaskPool().submit(thread);
}else {
System.out.println("线程池中已经含有该线程:"+threadName);
}
}
/* *
*根据线程名查询线程中是否包含该线程,包含返回true不包含返回false
* @description:
* @author hubin
* @Param [threadName, queue]
* @return boolean [返回类型说明]
* @exception [违例类型] [违例说明]
* @version [版本号, 2019/4/28]
*/
private static boolean isContains(String threadName,BlockingDeque<Runnable> queue){
boolean flag=false;
Iterator<Runnable> itr=queue.iterator();
try {
while (itr.hasNext()){
Runnable runnable=itr.next();
Class clazz = runnable.getClass();
Field callableField = clazz.getDeclaredField("callable");
callableField.setAccessible(true);
Object callable = callableField.get(runnable);
Class clazz2 = callable.getClass();
Field taskField = clazz2.getDeclaredField("task");
taskField.setAccessible(true);
Object task = taskField.get(callable);
//判断包含同名的线程任务
if(((Thread)task).getName().equalsIgnoreCase(threadName)){
flag=true;
break;
}
}
} catch (Exception e) {
}
return flag;
}
public static void main(String[] args) {
for(int i=0;i<100;i++){
Thread taskThread=new Thread(){
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("线程:"+this.getName()+"被执行");
}
};
String threadName="test"+(int)(i*Math.random());
ScheduledTaskPool.excuteThread(taskThread,threadName);
}
}
}