首先配置ThreadPoolTaskScheduler线程池:
package cn.demo.support.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class ScheduleConfig { @Bean(destroyMethod = "shutdown") public ThreadPoolTaskScheduler taskExecutor() { ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler(); executor.setPoolSize(20); executor.setThreadNamePrefix("taskExecutor-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); return executor; } }
配置定时任务,业务类中注入HiveClusterSyncScheduler即可调用:
1 package cn.jsfund.ngdp.support.batchSchedule; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.List; 6 import java.util.Map; 7 import java.util.Map.Entry; 8 import java.util.concurrent.ConcurrentHashMap; 9 import java.util.concurrent.ScheduledFuture; 10 11 import javax.annotation.PostConstruct; 12 import javax.annotation.Resource; 13 14 import org.apache.commons.lang.StringUtils; 15 import org.slf4j.Logger; 16 import org.slf4j.LoggerFactory; 17 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.jdbc.core.JdbcTemplate; 19 import org.springframework.scheduling.Trigger; 20 import org.springframework.scheduling.TriggerContext; 21 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 22 import org.springframework.scheduling.support.CronSequenceGenerator; 23 import org.springframework.scheduling.support.CronTrigger; 24 import org.springframework.stereotype.Component; 25 import org.springframework.util.ObjectUtils; 26 27 import cn.jsfund.ngdp.support.config.BasicConfig; 28 import cn.jsfund.ngdp.support.exception.ServiceException; 29 import cn.jsfund.ngdp.support.model.bigdata.TaskDef; 30 import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService; 31 32 @SuppressWarnings("rawtypes") 33 @Component 34 public class HiveClusterSyncScheduler { 35 36 @Resource 37 private JdbcTemplate dmJdbcTemplate; 38 39 @Resource 40 private BasicConfig basicConfig; 41 42 @Resource 43 HiveClusterSyncService hiveClusterSyncService;//业务类 44 45 @Autowired 46 ThreadPoolTaskScheduler threadPoolTaskScheduler; 47 48 private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class); 49 50 private static final String MAPTASKKEY = "map_task_key"; 51 52 private static Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<String, ScheduledFuture>(); 53 54 @PostConstruct 55 public void init() { 56 refreshTasks(); 57 } 58 59 // @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}") 60 public void refreshTasks() { 61 62 if (!"true".equals(basicConfig.getBackupEnabled())) { 63 logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************"); 64 return; 65 } 66 for (Entry<String, ScheduledFuture> entry : scheduledFutureMap.entrySet()) { 67 ScheduledFuture sf = entry.getValue(); 68 if (sf != null) { 69 sf.cancel(false); 70 } 71 } 72 scheduledFutureMap.clear(); 73 74 logger.info("*************开始扫描数据库,刷新定时任务*************"); 75 76 List<TaskDef> list = new ArrayList<>(); 77 78 try { 79 list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null) 80 .getContent(); 81 } catch (Exception e) { 82 logger.info("查询数据库异常,代码执行结束,异常信息:", e); 83 } 84 if (ObjectUtils.isEmpty(list)) { 85 logger.info("查询启动状态的任务记录为空,代码执行结束。"); 86 return; 87 } 88 for (TaskDef taskDef : list) { 89 String taskId = taskDef.getId(); 90 String crontab = taskDef.getCrontab(); 91 if (StringUtils.isBlank(crontab)) { 92 continue; 93 } 94 95 TaskThread taskThread = new TaskThread(taskId, crontab); 96 boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab); 97 if (!isValidExp) { 98 logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab); 99 continue; 100 } 101 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 102 new Trigger() { 103 104 @Override 105 public Date nextExecutionTime(TriggerContext triggerContext) { 106 107 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 108 } 109 }); 110 111 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 112 } 113 logger.info("*************刷新定时任务完成*************"); 114 } 115 116 //添加计划 117 public void addTask(String taskId, String crontab) { 118 if (!"true".equals(basicConfig.getBackupEnabled())) { 119 return; 120 } 121 TaskThread taskThread = new TaskThread(taskId, crontab); 122 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 123 new Trigger() { 124 @Override 125 public Date nextExecutionTime(TriggerContext triggerContext) { 126 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 127 } 128 }); 129 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 130 } 131 132 //取消计划 133 public void cancelTask(Object... taskId) { 134 for (int i = 0; i < taskId.length; i++) { 135 ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId[i]); 136 if (sf != null) { 137 sf.cancel(false); 138 scheduledFutureMap.remove(MAPTASKKEY + taskId[i]); 139 } 140 } 141 } 142 143 //更新计划:先取消再添加 144 public void updateScheduleTask(String taskId, String crontab) throws ServiceException { 145 if (!"true".equals(basicConfig.getBackupEnabled())) { 146 return; 147 } 148 ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId); 149 if (sf != null) { 150 sf.cancel(false); 151 scheduledFutureMap.remove(MAPTASKKEY + taskId); 152 } 153 TaskThread taskThread = new TaskThread(taskId, crontab); 154 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 155 new Trigger() { 156 @Override 157 public Date nextExecutionTime(TriggerContext triggerContext) { 158 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 159 } 160 }); 161 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 162 } 163 164 class TaskThread extends Thread { 165 166 private String taskId; 167 168 private String crontab; 169 170 public TaskThread(String taskId, String crontab) { 171 this.taskId = taskId; 172 this.crontab = crontab; 173 } 174 175 public void run() { 176 try { 177 hiveClusterSyncService.bootTask(taskId, crontab); 178 } catch (Exception e) { 179 logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage()); 180 } 181 } 182 } 183 184 }
package cn.jsfund.ngdp.support.batchSchedule;
import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;import javax.annotation.Resource;
import org.apache.commons.lang.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.scheduling.Trigger;import org.springframework.scheduling.TriggerContext;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.scheduling.support.CronSequenceGenerator;import org.springframework.scheduling.support.CronTrigger;import org.springframework.stereotype.Component;import org.springframework.util.ObjectUtils;
import cn.jsfund.ngdp.support.config.BasicConfig;import cn.jsfund.ngdp.support.exception.ServiceException;import cn.jsfund.ngdp.support.model.bigdata.TaskDef;import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService;
@SuppressWarnings("rawtypes")@Componentpublic class HiveClusterSyncScheduler {
@Resource private JdbcTemplate dmJdbcTemplate;
@Resource private BasicConfig basicConfig;
@Resource HiveClusterSyncService hiveClusterSyncService;
@Autowired ThreadPoolTaskScheduler threadPoolTaskScheduler;
private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class);
private static final String MAPTASKKEY = "map_task_key";
private static Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<String, ScheduledFuture>();
@PostConstruct public void init() { refreshTasks(); }
// @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}") public void refreshTasks() {
if (!"true".equals(basicConfig.getBackupEnabled())) { logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************"); return; } for (Entry<String, ScheduledFuture> entry : scheduledFutureMap.entrySet()) { ScheduledFuture sf = entry.getValue(); if (sf != null) { sf.cancel(false); } } scheduledFutureMap.clear();
logger.info("*************开始扫描数据库,刷新定时任务*************");
List<TaskDef> list = new ArrayList<>();
try { list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null) .getContent(); } catch (Exception e) { logger.info("查询数据库异常,代码执行结束,异常信息:", e); } if (ObjectUtils.isEmpty(list)) { logger.info("查询启动状态的任务记录为空,代码执行结束。"); return; } for (TaskDef taskDef : list) { String taskId = taskDef.getId(); String crontab = taskDef.getCrontab(); if (StringUtils.isBlank(crontab)) { continue; }
TaskThread taskThread = new TaskThread(taskId, crontab); boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab); if (!isValidExp) { logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab); continue; } ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, new Trigger() {
@Override public Date nextExecutionTime(TriggerContext triggerContext) {
return new CronTrigger(crontab).nextExecutionTime(triggerContext); } });
scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); } logger.info("*************刷新定时任务完成*************"); }
//添加计划 public void addTask(String taskId, String crontab) { if (!"true".equals(basicConfig.getBackupEnabled())) { return; } TaskThread taskThread = new TaskThread(taskId, crontab); ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, new Trigger() { @Override public Date nextExecutionTime(TriggerContext triggerContext) { return new CronTrigger(crontab).nextExecutionTime(triggerContext); } }); scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); }
//取消计划 public void cancelTask(Object... taskId) { for (int i = 0; i < taskId.length; i++) { ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId[i]); if (sf != null) { sf.cancel(false); scheduledFutureMap.remove(MAPTASKKEY + taskId[i]); } } }
//更新计划:先取消再添加 public void updateScheduleTask(String taskId, String crontab) throws ServiceException { if (!"true".equals(basicConfig.getBackupEnabled())) { return; } ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId); if (sf != null) { sf.cancel(false); scheduledFutureMap.remove(MAPTASKKEY + taskId); } TaskThread taskThread = new TaskThread(taskId, crontab); ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, new Trigger() { @Override public Date nextExecutionTime(TriggerContext triggerContext) { return new CronTrigger(crontab).nextExecutionTime(triggerContext); } }); scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); }
class TaskThread extends Thread {
private String taskId;
private String crontab;
public TaskThread(String taskId, String crontab) { this.taskId = taskId; this.crontab = crontab; }
public void run() { try { hiveClusterSyncService.bootTask(taskId, crontab); } catch (Exception e) { logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage()); } } }
}