文章目录
背景
quartz 可用于管理调度定时任务,有集群模式和单机模式,quartz 的单机模式部署,所有任务执行信息都在内存中保存,存在单点故障,quartz 的集群模式具备高可用,自动负载均衡等特点,可保障定时任务的执行。
1.1 SpringBoot + Mysql + Quartz 集群模式搭建
注: 集群模式依赖实例所在机器之间的时间同步,请自行部署 ntp 服务进行时间同步。
1.1 Quartz 相关表建立
- 去官网下载 quartz,下载地址,需要下载2.2.3或者更低版本
- 解压后,执行 docs/dbTables/tables_mysql_innodb.sql 脚本建表
- 检查 db 中是否存在以下 11 个表
+--------------------------+
| QRTZ_BLOB_TRIGGERS |
| QRTZ_CALENDARS |
| QRTZ_CRON_TRIGGERS |
| QRTZ_FIRED_TRIGGERS |
| QRTZ_JOB_DETAILS |
| QRTZ_LOCKS |
| QRTZ_PAUSED_TRIGGER_GRPS |
| QRTZ_SCHEDULER_STATE |
| QRTZ_SIMPLE_TRIGGERS |
| QRTZ_SIMPROP_TRIGGERS |
| QRTZ_TRIGGERS |
+--------------------------+
1.2 maven 中引入 Quartz 相关包
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>
1.3 创建quartz配置文件
#默认或是自己改名字都行
org.quartz.scheduler.instanceName=DefaultQuartzScheduler
#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.dataSource=qzDS
# 开启集群模式
org.quartz.jobStore.isClustered=true
# 集群实例检测时间间隔 ms
org.quartz.jobStore.clusterCheckinInterval=5000
# misfire 任务的超时阈值 ms
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
# 工作线程的线程池设置
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=5
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
#============================================================================
# Configure Datasources
#============================================================================
#配置数据源
org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/dbName?characterEncoding=utf8&useSSL=true
org.quartz.dataSource.qzDS.user=xxx
org.quartz.dataSource.qzDS.password=xxx
org.quartz.dataSource.qzDS.validationQuery=select 0 from dual
特别解释一下这个参数 org.quartz.jobStore.misfireThreshold = 60000
, misfire 任务为错过调度触发时间的任务,而 misfireThreshold 为判定触发任务为 misfire 的判定条件,比如规定 11:30 要执行一次 Job, 如果因为实例挂掉或者线程池忙导致 11:33 才触发调度,超时了 3 分钟,超时时间 > 60000ms, 因此判定为 misfire。
判定为 misfire 的处理规则在后面的原理介绍相关文章会提及。
1.4 创建job 实例工厂,解决spring注入问题,如果使用默认会导致spring的@Autowired 无法注入问题(很重要)
@Component
public class MyJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
1.5 quartz的初始化配置,生成 ScheduleFactory Bean
@Configuration
public class SchedulerConfiguration {
@Autowired
private MyJobFactory myJobFactory;
@Bean(name = "schedulerFactoryBean")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
//获取配置属性
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("quartz.properties"));
//在quartz.properties中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
//创建SchedulerFactoryBean
SchedulerFactoryBean factory = new SchedulerFactoryBean();
Properties pro = propertiesFactoryBean.getObject();
factory.setOverwriteExistingJobs(true);
factory.setAutoStartup(true);
factory.setQuartzProperties(pro);
factory.setJobFactory(myJobFactory);
return factory;
}
}
1.6 任务管理实现类
package com.tencent.oa.fm.digital.ops.intelligent.alarm.server.common.schedules;
import com.alibaba.fastjson.JSONObject;
import com.tencent.oa.fm.digital.ops.intelligent.alarm.contract.SysScheduleTaskDTO;
import com.tencent.oa.fm.digital.ops.intelligent.alarm.server.common.util.LogUtils;
import lombok.extern.log4j.Log4j2;
import org.joda.time.DateTime;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.*;
/**
*
* @ClassName: DistributeQuartzManager
* @Description 分布式集群quartz定时任务管理增删改
* @date 2019/10/1211:04
*/
@Log4j2
@Component
public class DistributeQuartzManager {
@Autowired
@Qualifier("schedulerFactoryBean")
private SchedulerFactoryBean schedulerFactory;
/**
* 判断一个job是否存在
*
* @param jobName
* 任务名
* @param jobGroupName
* 任务组名
* @return
*/
public boolean isExistJob(String jobName, String jobGroupName) {
boolean exist = false;
try {
Scheduler sched = schedulerFactory.getScheduler();
JobKey jobKey = new JobKey(jobName, jobGroupName);
exist = sched.checkExists(jobKey);
}
catch (SchedulerException e) {
e.printStackTrace();
}
if (exist) {
log.debug("触发器[" + jobName + "]重复");
}
else {
log.debug("触发器[" + jobName + "]可用");
}
return exist;
}
/**
* @Description: 添加一个定时任务
*
* @param jobName
* 任务名
* @param jobGroupName
* 任务组名
* @param triggerName
* 触发器名
* @param triggerGroupName
* 触发器组名
* @param jobClass
* 任务
* @param cron
* 时间设置,参考quartz说明文档
*/
public JobDetail addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
@SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
return doAddJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
}
private JobDetail doAddJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, JobDataMap jMap, String cron) {
JobDetail jobDetail = null;
if(StringUtils.isEmpty(jobGroupName)){
jobGroupName = Scheduler.DEFAULT_GROUP;
}
if(StringUtils.isEmpty(triggerGroupName)){
triggerGroupName = Scheduler.DEFAULT_GROUP;
}
try {
Scheduler sched = schedulerFactory.getScheduler();
// 任务名,任务组,任务执行类
JobBuilder jobBuilder = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName);
if(jMap != null && jMap.size() > 0){
jobBuilder = jobBuilder.usingJobData(jMap);
}
jobDetail = jobBuilder.build();
// 触发器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 触发器名,触发器组
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 触发器时间设定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 创建Trigger对象
CronTrigger trigger = (CronTrigger) triggerBuilder.build();
// 调度容器设置JobDetail和Trigger
sched.scheduleJob(jobDetail, trigger);
Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
// |-NONE 无
// |-NORMAL 正常状态
// |-PAUSED 暂停状态
// |-COMPLETE 完成
// |-ERROR 错误
// |-BLOCKED 堵塞
log.debug("JobName:" + jobName + ",状态:" + triggerState + ",GroupName:" + jobGroupName);
// 启动
if (!sched.isShutdown()) {
sched.start();
}
// 按新的trigger重新设置job执行
// sched.rescheduleJob(trigger.getKey(), trigger);
} catch (Exception e) {
log.error("添加一个定时任务发生异常:" + e);
}
return jobDetail;
}
/**
* 启动一个定时作业,如果原来已经启动该作业,先进行停止,删除操作,然后再重新添加启动作业
* @param jobName
* @param jobGroupName
* @param triggerName
* @param triggerGroupName
* @param jobClass
* @param jMap
* @param cron
*/
public JobDetail startJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
@SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
//存在定时作业,先进行删除
if(isExistJob(jobName, jobGroupName) == true) {
removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
}
//添加并启动job
return addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
}
public void startJob(JobDetail jobDetail, CronTrigger trigger) {
try {
Scheduler sched = schedulerFactory.getScheduler();
// 调度容器设置JobDetail和Trigger
sched.scheduleJob(jobDetail, trigger);
Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
// |-NONE 无
// |-NORMAL 正常状态
// |-PAUSED 暂停状态
// |-COMPLETE 完成
// |-ERROR 错误
// |-BLOCKED 堵塞
log.info("addJob JobKey:" + jobDetail.getKey() + ",状态:" + triggerState);
// 启动
if (!sched.isShutdown()) {
sched.start();
}
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* @Description: 修改一个任务的触发时间
*
* @param jobName
* @param jobGroupName
* @param triggerName
* 触发器名
* @param triggerGroupName
* 触发器组名
* @param cron
* 时间设置,参考quartz说明文档
*/
public void modifyJobTime(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
@SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
/** 方式一 :调用 rescheduleJob 开始 */
// 触发器
// TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 触发器名,触发器组
// triggerBuilder.withIdentity(triggerName, triggerGroupName);
// triggerBuilder.startNow();
// 触发器时间设定
// triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 创建Trigger对象
// trigger = (CronTrigger) triggerBuilder.build();
// 方式一 :修改一个任务的触发时间
// sched.rescheduleJob(triggerKey, trigger);
/** 方式一 :调用 rescheduleJob 结束 */
/** 方式二:先删除,然后在创建一个新的Job */
removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
log.info(String.format("修改【%s】定时任务成功!",jobName));
/** 方式二 :先删除,然后在创建一个新的Job */
}
/**
* @Description: 移除一个任务
*
* @param jobName
* @param jobGroupName
* @param triggerName
* @param triggerGroupName
*/
public void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) {
try {
/* ApplicationContext context = SpringContextUtils.getApplicationContext();
RedisDistributedLock redLock = context.getBean(RedisDistributedLock.class);
String lockKey = DOS + CacheConstant.LOCK_KEY + CacheConstant.SEPARATOR + jobGroupName + CacheConstant.SEPARATOR + jobName + CacheConstant.SEPARATOR + "Execute";
redLock.unlockAsync(lockKey);*/
Scheduler sched = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
sched.pauseTrigger(triggerKey);// 停止触发器
sched.unscheduleJob(triggerKey);// 移除触发器
sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务
List<String> jobGroupNames = sched.getJobGroupNames();
log.debug("移除任务组开始-->groupsNames=[");
for (String string : jobGroupNames) {
GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(string);
Set<JobKey> jobKeys = sched.getJobKeys(matcher);
log.debug(string + "下的JOB为[");
for (JobKey jobKey : jobKeys) {
log.debug(jobKey.getName() + ",");
}
log.debug("]");
}
log.debug("]移除任务组结束。");
} catch (Exception e) {
log.error("移除job任务发生异常:" + e);
}
}
public void getSchedulerStatus() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
List<String> jobGroupNames = scheduler.getJobGroupNames();
for (String jobGroupName : jobGroupNames) {
GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(jobGroupName);
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
for (JobKey jobKey : jobKeys) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
String cron = "";
for (Trigger trigger : triggers) {
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
cron = cronTrigger.getCronExpression();
}
}
log.info("-------------job name=" + jobKey.getName() + ",group name=" + jobGroupName + ",scheduler name=" + scheduler.getSchedulerName() + ",cron=" + cron);
}
}
List<JobExecutionContext> jobExecutionContexts = scheduler.getCurrentlyExecutingJobs();
for(JobExecutionContext jobExecutionContext : jobExecutionContexts){
JobDetail jobDetail = jobExecutionContext.getJobDetail();
JobKey jobKey = jobDetail.getKey();
String fireTime = new DateTime(jobExecutionContext.getFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
String previousTime = new DateTime(jobExecutionContext.getPreviousFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
String nextFireTime = new DateTime(jobExecutionContext.getNextFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
log.info("---------current running job key=" + jobKey.getName() + ",group name=" + jobKey.getGroup() + ",scheduler name=" + scheduler.getSchedulerName()
+ LogUtils.formatScheduledJobLogInfo(jobExecutionContext) + ",class=" + jobKey.getClass().getSimpleName() +
",description=" + jobDetail.getDescription());
}
Set<String> pauseGroupNames = scheduler.getPausedTriggerGroups();
for (String jobGroupName : pauseGroupNames) {
GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(jobGroupName);
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
for (JobKey jobKey : jobKeys) {
log.info("-------------pause job name=" + jobKey.getName() + ",group name=" + jobGroupName + ",scheduler name=" + scheduler.getSchedulerName());
}
}
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* @Description:启动所有定时任务
*/
public void startAllJobs() {
try {
Scheduler sched = schedulerFactory.getScheduler();
sched.start();
}
catch (Exception e) {
log.error("启动所有定时任务发生异常:", e);
throw new RuntimeException(e);
}
}
public static Map<String,String> parseJobDataMap(String jsonStr){
Map<String,String> map = new HashMap<>();
if(StringUtils.isEmpty(jsonStr)){
return map;
}
try{
JSONObject json = JSONObject.parseObject(jsonStr);
for (String key : json.keySet()) {
String value = json.getString(key);
map.put(key,value);
}
}catch (Exception e){
log.error("parseJobDataMap error is:{}", e);
}
return map;
}
/**
* @Description:关闭所有定时任务
*/
public void shutdownAllJobs() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
if (!scheduler.isShutdown()) {
scheduler.shutdown();
}
}
catch (Exception e) {
log.error("关闭所有定时任务发生异常:", e);
throw new RuntimeException(e);
}
}
/**
* 计划日程待调度的任务
* @return
*/
public List<SysScheduleTaskDTO> queryAllJobs(){
List<SysScheduleTaskDTO> jobConfigs = new ArrayList<>();
try {
Scheduler scheduler = schedulerFactory.getScheduler();
for(String groupJob: scheduler.getJobGroupNames()){
for(JobKey jobKey: scheduler.getJobKeys(GroupMatcher.groupEquals(groupJob))){
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger: triggers) {
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
SysScheduleTaskDTO jobConfig = new SysScheduleTaskDTO();
String cronExpression = "";
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
cronExpression = cronTrigger.getCronExpression();
TriggerKey triggerKey =cronTrigger.getKey();
jobConfig.setTriggerName(triggerKey.getName());
jobConfig.setTriggerGroupName(triggerKey.getGroup());
}
Class jobClazz = jobDetail.getJobClass();
String classCode = JobClassEnum.getCodeByClass(jobClazz);
jobConfig.setJobClass(classCode);
jobConfig.setJobName(jobKey.getName());
jobConfig.setJobGroupName(jobKey.getGroup());
jobConfig.setDescription(jobDetail.getDescription());
jobConfig.setStatus(triggerState.name());
jobConfig.setCron(cronExpression);
jobConfigs.add(jobConfig);
}
}
}
} catch (SchedulerException e) {
e.printStackTrace();
log.error("查询所有定时任务发生异常:", e);
throw new RuntimeException(e);
}
return jobConfigs;
}
/**
* 正在运行中的任务
* @return
*/
public List<SysScheduleTaskDTO> getRunningJobs(){
List<SysScheduleTaskDTO> jobList = new ArrayList<>();
try {
Scheduler scheduler = schedulerFactory.getScheduler();
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext executingJob : executingJobs) {
SysScheduleTaskDTO job = new SysScheduleTaskDTO();
Trigger trigger = executingJob.getTrigger();
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
TriggerKey triggerKey =trigger.getKey();
job.setTriggerName(triggerKey.getName());
job.setTriggerGroupName(triggerKey.getGroup());
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Class jobClazz = jobDetail.getJobClass();
String classCode = JobClassEnum.getCodeByClass(jobClazz);
job.setJobClass(classCode);
job.setJobName(jobKey.getName());
job.setJobGroupName(jobKey.getGroup());
job.setDescription(jobDetail.getDescription());
job.setStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCron(cronExpression);
}
job.setDescription("触发器:" + trigger.getKey());
jobList.add(job);
}
} catch (SchedulerException e) {
e.printStackTrace();
}
return jobList;
}
}
1.7 启动程序
quartz 集群和其他分布式集群不一样,集群实例之间不需要互相通信,只需要和DB 交互,通过 DB 感知其他*,实现 Job 调度。因此只需要按照普通 java 程序启动即可,扩容也只需要新启动实例,不需要做额外配置。