spring quartz集群搭建

文章目录

背景

quartz 可用于管理调度定时任务,有集群模式和单机模式,quartz 的单机模式部署,所有任务执行信息都在内存中保存,存在单点故障,quartz 的集群模式具备高可用,自动负载均衡等特点,可保障定时任务的执行。

1.1 SpringBoot + Mysql + Quartz 集群模式搭建

注: 集群模式依赖实例所在机器之间的时间同步,请自行部署 ntp 服务进行时间同步。
1.1 Quartz 相关表建立

  • 去官网下载 quartz,下载地址,需要下载2.2.3或者更低版本
  • 解压后,执行 docs/dbTables/tables_mysql_innodb.sql 脚本建表
  • 检查 db 中是否存在以下 11 个表
+--------------------------+
| QRTZ_BLOB_TRIGGERS       |
| QRTZ_CALENDARS           |
| QRTZ_CRON_TRIGGERS       |
| QRTZ_FIRED_TRIGGERS      |
| QRTZ_JOB_DETAILS         |
| QRTZ_LOCKS               |
| QRTZ_PAUSED_TRIGGER_GRPS |
| QRTZ_SCHEDULER_STATE     |
| QRTZ_SIMPLE_TRIGGERS     |
| QRTZ_SIMPROP_TRIGGERS    |
| QRTZ_TRIGGERS            |
+--------------------------+

1.2 maven 中引入 Quartz 相关包

        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.1</version>
        </dependency>

1.3 创建quartz配置文件

#默认或是自己改名字都行
org.quartz.scheduler.instanceName=DefaultQuartzScheduler

#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.dataSource=qzDS
# 开启集群模式
org.quartz.jobStore.isClustered=true
# 集群实例检测时间间隔 ms
org.quartz.jobStore.clusterCheckinInterval=5000

# misfire 任务的超时阈值 ms
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate


org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false

# 工作线程的线程池设置
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=5
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

#============================================================================
# Configure Datasources
#============================================================================
#配置数据源
org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/dbName?characterEncoding=utf8&useSSL=true
org.quartz.dataSource.qzDS.user=xxx
org.quartz.dataSource.qzDS.password=xxx
org.quartz.dataSource.qzDS.validationQuery=select 0 from dual

特别解释一下这个参数 org.quartz.jobStore.misfireThreshold = 60000, misfire 任务为错过调度触发时间的任务,而 misfireThreshold 为判定触发任务为 misfire 的判定条件,比如规定 11:30 要执行一次 Job, 如果因为实例挂掉或者线程池忙导致 11:33 才触发调度,超时了 3 分钟,超时时间 > 60000ms, 因此判定为 misfire。

判定为 misfire 的处理规则在后面的原理介绍相关文章会提及。

1.4 创建job 实例工厂,解决spring注入问题,如果使用默认会导致spring的@Autowired 无法注入问题(很重要

@Component
public class MyJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {

    private transient AutowireCapableBeanFactory beanFactory;

    @Override
    public void setApplicationContext(final ApplicationContext context) {
        beanFactory = context.getAutowireCapableBeanFactory();
    }

    @Override
    protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
        final Object job = super.createJobInstance(bundle);
        beanFactory.autowireBean(job);
        return job;
    }
}

1.5 quartz的初始化配置,生成 ScheduleFactory Bean

@Configuration
public class SchedulerConfiguration {

    @Autowired
    private MyJobFactory myJobFactory;

    @Bean(name = "schedulerFactoryBean")
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        //获取配置属性
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("quartz.properties"));
        //在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        //创建SchedulerFactoryBean
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        Properties pro = propertiesFactoryBean.getObject();
        factory.setOverwriteExistingJobs(true);
        factory.setAutoStartup(true);
        factory.setQuartzProperties(pro);
        factory.setJobFactory(myJobFactory);
        return factory;
    }

}

1.6 任务管理实现类

package com.tencent.oa.fm.digital.ops.intelligent.alarm.server.common.schedules;


import com.alibaba.fastjson.JSONObject;
import com.tencent.oa.fm.digital.ops.intelligent.alarm.contract.SysScheduleTaskDTO;
import com.tencent.oa.fm.digital.ops.intelligent.alarm.server.common.util.LogUtils;
import lombok.extern.log4j.Log4j2;
import org.joda.time.DateTime;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.*;

/**
 *
 * @ClassName: DistributeQuartzManager
 * @Description 分布式集群quartz定时任务管理增删改
 * @date 2019/10/1211:04
 */
@Log4j2
@Component
public class DistributeQuartzManager {

    @Autowired
    @Qualifier("schedulerFactoryBean")
    private SchedulerFactoryBean schedulerFactory;

    /**
     * 判断一个job是否存在
     *
     * @param jobName
     *            任务名
     * @param jobGroupName
     *            任务组名
     * @return
     */
    public  boolean isExistJob(String jobName, String jobGroupName) {
        boolean exist = false;
        try {

            Scheduler sched = schedulerFactory.getScheduler();
            JobKey jobKey = new JobKey(jobName, jobGroupName);
            exist = sched.checkExists(jobKey);
        }
        catch (SchedulerException e) {
            e.printStackTrace();
        }
        if (exist) {
            log.debug("触发器[" + jobName + "]重复");
        }
        else {
            log.debug("触发器[" + jobName + "]可用");
        }
        return exist;

    }

    /**
     * @Description: 添加一个定时任务
     *
     * @param jobName
     *            任务名
     * @param jobGroupName
     *            任务组名
     * @param triggerName
     *            触发器名
     * @param triggerGroupName
     *            触发器组名
     * @param jobClass
     *            任务
     * @param cron
     *            时间设置,参考quartz说明文档
     */
    public JobDetail addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
                                   @SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
        return doAddJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
    }

    private JobDetail doAddJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, JobDataMap jMap, String cron) {
        JobDetail jobDetail = null;
        if(StringUtils.isEmpty(jobGroupName)){
            jobGroupName = Scheduler.DEFAULT_GROUP;
        }

        if(StringUtils.isEmpty(triggerGroupName)){
            triggerGroupName = Scheduler.DEFAULT_GROUP;
        }

        try {
            Scheduler sched = schedulerFactory.getScheduler();
            // 任务名,任务组,任务执行类
            JobBuilder jobBuilder = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName);
            if(jMap != null && jMap.size() > 0){
                jobBuilder = jobBuilder.usingJobData(jMap);
            }
            jobDetail = jobBuilder.build();

            // 触发器
            TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
            // 触发器名,触发器组
            triggerBuilder.withIdentity(triggerName, triggerGroupName);
            triggerBuilder.startNow();
            // 触发器时间设定
            triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
            // 创建Trigger对象
            CronTrigger trigger = (CronTrigger) triggerBuilder.build();

            // 调度容器设置JobDetail和Trigger
            sched.scheduleJob(jobDetail, trigger);
            Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
            // |-NONE 无
            // |-NORMAL 正常状态
            // |-PAUSED 暂停状态
            // |-COMPLETE 完成
            // |-ERROR 错误
            // |-BLOCKED 堵塞

            log.debug("JobName:" + jobName + ",状态:" + triggerState + ",GroupName:" + jobGroupName);
            // 启动
            if (!sched.isShutdown()) {
                sched.start();
            }

            // 按新的trigger重新设置job执行
//            sched.rescheduleJob(trigger.getKey(), trigger);
        } catch (Exception e) {
            log.error("添加一个定时任务发生异常:" +  e);
        }

        return jobDetail;
    }

    /**
     * 启动一个定时作业,如果原来已经启动该作业,先进行停止,删除操作,然后再重新添加启动作业
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     * @param triggerGroupName
     * @param jobClass
     * @param jMap
     * @param cron
     */
    public JobDetail startJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
                                     @SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
        //存在定时作业,先进行删除
        if(isExistJob(jobName, jobGroupName) == true) {
            removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
        }
        //添加并启动job
        return addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
    }

    public void startJob(JobDetail jobDetail, CronTrigger trigger) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            // 调度容器设置JobDetail和Trigger
            sched.scheduleJob(jobDetail, trigger);
            Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
            // |-NONE 无
            // |-NORMAL 正常状态
            // |-PAUSED 暂停状态
            // |-COMPLETE 完成
            // |-ERROR 错误
            // |-BLOCKED 堵塞


            log.info("addJob JobKey:" + jobDetail.getKey() + ",状态:" + triggerState);
            // 启动
            if (!sched.isShutdown()) {
                sched.start();
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Description: 修改一个任务的触发时间
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     *            触发器名
     * @param triggerGroupName
     *            触发器组名
     * @param cron
     *            时间设置,参考quartz说明文档
     */
    public void modifyJobTime(String jobName, String jobGroupName, String triggerName, String triggerGroupName,
                                     @SuppressWarnings("rawtypes") Class jobClass, JobDataMap jMap, String cron) {
        /** 方式一 :调用 rescheduleJob 开始 */
        // 触发器
        // TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
        // 触发器名,触发器组
        // triggerBuilder.withIdentity(triggerName, triggerGroupName);
        // triggerBuilder.startNow();
        // 触发器时间设定
        // triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
        // 创建Trigger对象
        // trigger = (CronTrigger) triggerBuilder.build();
        // 方式一 :修改一个任务的触发时间
        // sched.rescheduleJob(triggerKey, trigger);
        /** 方式一 :调用 rescheduleJob 结束 */

        /** 方式二:先删除,然后在创建一个新的Job */
        removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
        addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, jMap, cron);
        log.info(String.format("修改【%s】定时任务成功!",jobName));

        /** 方式二 :先删除,然后在创建一个新的Job */
    }

    /**
     * @Description: 移除一个任务
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     * @param triggerGroupName
     */
    public void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) {
        try {
           /* ApplicationContext context = SpringContextUtils.getApplicationContext();
            RedisDistributedLock redLock = context.getBean(RedisDistributedLock.class);
            String lockKey = DOS + CacheConstant.LOCK_KEY + CacheConstant.SEPARATOR + jobGroupName + CacheConstant.SEPARATOR + jobName + CacheConstant.SEPARATOR + "Execute";
            redLock.unlockAsync(lockKey);*/

            Scheduler sched = schedulerFactory.getScheduler();

            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);

            sched.pauseTrigger(triggerKey);// 停止触发器
            sched.unscheduleJob(triggerKey);// 移除触发器
            sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务

            List<String> jobGroupNames = sched.getJobGroupNames();
            log.debug("移除任务组开始-->groupsNames=[");
            for (String string : jobGroupNames) {
                GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(string);
                Set<JobKey> jobKeys = sched.getJobKeys(matcher);
                log.debug(string + "下的JOB为[");
                for (JobKey jobKey : jobKeys) {
                    log.debug(jobKey.getName() + ",");
                }
                log.debug("]");

            }
            log.debug("]移除任务组结束。");
        } catch (Exception e) {
            log.error("移除job任务发生异常:" + e);
        }
    }

    public void getSchedulerStatus() {
        try {
                Scheduler scheduler = schedulerFactory.getScheduler();
                List<String> jobGroupNames = scheduler.getJobGroupNames();
                for (String jobGroupName : jobGroupNames) {
                    GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(jobGroupName);
                    Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
                    for (JobKey jobKey : jobKeys) {
                        List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                        String cron = "";
                        for (Trigger trigger : triggers) {
                            if (trigger instanceof CronTrigger) {
                                CronTrigger cronTrigger = (CronTrigger) trigger;
                                cron = cronTrigger.getCronExpression();
                            }
                        }

                        log.info("-------------job name=" + jobKey.getName() + ",group name=" + jobGroupName + ",scheduler name=" + scheduler.getSchedulerName() + ",cron=" + cron);
                    }
                }
                List<JobExecutionContext> jobExecutionContexts = scheduler.getCurrentlyExecutingJobs();
                for(JobExecutionContext jobExecutionContext : jobExecutionContexts){
                    JobDetail jobDetail = jobExecutionContext.getJobDetail();
                    JobKey jobKey = jobDetail.getKey();
                    String fireTime = new DateTime(jobExecutionContext.getFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
                    String previousTime = new DateTime(jobExecutionContext.getPreviousFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
                    String nextFireTime = new DateTime(jobExecutionContext.getNextFireTime()).toString(JobConstant.DATE_TIME_FORMAT);
                    log.info("---------current running job key=" + jobKey.getName() + ",group name=" + jobKey.getGroup() + ",scheduler name=" + scheduler.getSchedulerName()
                            + LogUtils.formatScheduledJobLogInfo(jobExecutionContext) + ",class=" + jobKey.getClass().getSimpleName() +
                            ",description=" + jobDetail.getDescription());

                }

                Set<String> pauseGroupNames = scheduler.getPausedTriggerGroups();
                for (String jobGroupName : pauseGroupNames) {
                    GroupMatcher<JobKey> matcher = GroupMatcher.jobGroupEquals(jobGroupName);
                    Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
                    for (JobKey jobKey : jobKeys) {
                        log.info("-------------pause job name=" + jobKey.getName() + ",group name=" + jobGroupName + ",scheduler name=" + scheduler.getSchedulerName());
                    }
                }


        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Description:启动所有定时任务
     */
    public void startAllJobs() {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            sched.start();
        }
        catch (Exception e) {
            log.error("启动所有定时任务发生异常:", e);
            throw new RuntimeException(e);
        }
    }

    public static Map<String,String> parseJobDataMap(String jsonStr){
        Map<String,String> map = new HashMap<>();
        if(StringUtils.isEmpty(jsonStr)){
            return map;
        }
        try{
            JSONObject json = JSONObject.parseObject(jsonStr);
            for (String key : json.keySet()) {
                String value = json.getString(key);
                map.put(key,value);
            }
        }catch (Exception e){
            log.error("parseJobDataMap error is:{}", e);
        }
        return map;
    }

    /**
     * @Description:关闭所有定时任务
     */
    public void shutdownAllJobs() {
        try {
            Scheduler scheduler = schedulerFactory.getScheduler();
            if (!scheduler.isShutdown()) {
                scheduler.shutdown();
            }
        }
        catch (Exception e) {
            log.error("关闭所有定时任务发生异常:", e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 计划日程待调度的任务
     * @return
     */
    public List<SysScheduleTaskDTO> queryAllJobs(){

        List<SysScheduleTaskDTO> jobConfigs = new ArrayList<>();
        try {
            Scheduler scheduler = schedulerFactory.getScheduler();
            for(String groupJob: scheduler.getJobGroupNames()){
                for(JobKey jobKey: scheduler.getJobKeys(GroupMatcher.groupEquals(groupJob))){
                    List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                    for (Trigger trigger: triggers) {
                        Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                        JobDetail jobDetail = scheduler.getJobDetail(jobKey);

                        SysScheduleTaskDTO jobConfig = new SysScheduleTaskDTO();
                        String cronExpression = "";
                        if (trigger instanceof CronTrigger) {
                            CronTrigger cronTrigger = (CronTrigger) trigger;
                            cronExpression = cronTrigger.getCronExpression();
                            TriggerKey triggerKey =cronTrigger.getKey();
                            jobConfig.setTriggerName(triggerKey.getName());
                            jobConfig.setTriggerGroupName(triggerKey.getGroup());
                        }

                        Class jobClazz = jobDetail.getJobClass();
                        String classCode = JobClassEnum.getCodeByClass(jobClazz);
                        jobConfig.setJobClass(classCode);
                        jobConfig.setJobName(jobKey.getName());
                        jobConfig.setJobGroupName(jobKey.getGroup());
                        jobConfig.setDescription(jobDetail.getDescription());
                        jobConfig.setStatus(triggerState.name());
                        jobConfig.setCron(cronExpression);
                        jobConfigs.add(jobConfig);
                    }
                }
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("查询所有定时任务发生异常:", e);
            throw new RuntimeException(e);
        }
        return jobConfigs;
    }

    /**
     * 正在运行中的任务
     * @return
     */
    public  List<SysScheduleTaskDTO> getRunningJobs(){
        List<SysScheduleTaskDTO> jobList = new ArrayList<>();
        try {
            Scheduler scheduler = schedulerFactory.getScheduler();
            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();

            for (JobExecutionContext executingJob : executingJobs) {
                SysScheduleTaskDTO job = new SysScheduleTaskDTO();
                Trigger trigger = executingJob.getTrigger();
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                TriggerKey triggerKey =trigger.getKey();
                job.setTriggerName(triggerKey.getName());
                job.setTriggerGroupName(triggerKey.getGroup());

                JobDetail jobDetail = executingJob.getJobDetail();
                JobKey jobKey = jobDetail.getKey();

                Class jobClazz = jobDetail.getJobClass();
                String classCode = JobClassEnum.getCodeByClass(jobClazz);
                job.setJobClass(classCode);
                job.setJobName(jobKey.getName());
                job.setJobGroupName(jobKey.getGroup());
                job.setDescription(jobDetail.getDescription());
                job.setStatus(triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    job.setCron(cronExpression);
                }
                job.setDescription("触发器:" + trigger.getKey());
                jobList.add(job);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }
}

1.7 启动程序

quartz 集群和其他分布式集群不一样,集群实例之间不需要互相通信,只需要和DB 交互,通过 DB 感知其他*,实现 Job 调度。因此只需要按照普通 java 程序启动即可,扩容也只需要新启动实例,不需要做额外配置。

上一篇:FF03HTML+CSS静态页面网页设计作业——餐饮美食-武昌鱼(8页) HTML+CSS+JavaScript 使用html+css实现一个静态页面(含源码)


下一篇:Mac实用操作技巧(一)