spring boot 整合 quartz

目录

概述

准备依赖

进行配置

设计定时任务


概述

     之前在项目中基本上都是使用了spring boot 自带的定时任务功能来管理定时任务,这样做的好处是

  1. 容易上手,本身功能集成在spring boot中开箱即用
  2. 代码编写方便清晰,基本上通过注解和cron表达式就可以完成需求  

    但是这样的方案也有一些弊端,比如当一个定时任务需要改变它的运行时间或周期的时候,你就需要修改代码并且重新启动服务来生效这次的修改。也就是这种方案的动态性不够。记得刚刚接触定时任务的时候,接到一个需要需要定时取获取一些汇率的信息,但是这个周期要求是变化的,比如我现在是1小时爬取一次,后面可能就停止任务不爬取,后面又可以启动继续爬取。当时还不知到又quartz的存在,所以在当时是自己写了一个线程管理的方法取控制任务的启动停止,比较复杂,也可能存在不少漏洞。后面接触了quartz,才知道原来有现成的*可以用啊。

   第一次使用quartz,是在一个分布式项目中,在公共服务中抽取出了一个定时任务管理的模块,来统一管理整个系统的定时任务,通过在管理系统中暴露定时任务的一些信息管理接口能够动态的管理任务的启动停止和周期,如果仅仅只是在内存中创建任务而没有持久化任务的信息的话,那服务的重启宕机就会导致任务的丢失。所以一般我们是需要把任务信息持久化到数据库中。

准备依赖

第一次学习和使用spring  整合 quartz 许多的知识认识还不到位,所以只能算是一个入门的总结。首先需要在spring  boot 工程中引入我们相关的依赖。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>

这个就是主要使用quartz的依赖了。由于我们是需要将定时任务持久化到数据库中,所以对于数据库需要进行定时任务表的相关设计,而quartz官网给出了模板我们照用就可以了

这里放上传送门

里面主要是schedule,trigger等配置。

进行配置

开始对项目进行配置,由于我们是需要把任务持久化的,虽然官网有了数据库脚本但是我们的项目需求各不相同,我们应该自定义一个表来存储这些任务的信息。另外系统运行最重要的就是日志,所以我们需要另外设计一个表类记录这些定时任务的运行记录。下面是我个人对这两项的设计,我使用的是spring data jpa,这个用来做demo很不错。

@Data
@Entity
@Table(name = "self_job")
@org.hibernate.annotations.Table(appliesTo = "self_job",comment = "任务模型")
@EntityListeners(AuditingEntityListener.class)
public class SelfJobPO implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "job_id",nullable = false,columnDefinition = "bigint(20) comment '主键'")
    private Long jobId;
    @Column(name = "job_name",nullable = false,columnDefinition = "varchar(45) comment '任务名称'")
    private String jobName;
    @Column(name = "group_name",nullable = false,columnDefinition = "varchar(45) comment '任务分组'")
    private String groupName;
    @Column(name = "bean_name",nullable = false,columnDefinition = "varchar(45) comment 'java容器名称'")
    private String beanName;
    @Column(name = "method_name",nullable = false,columnDefinition = "varchar(45) comment '方法名'")
    private String methodName;
    @Column(name = "cron_expression",nullable = false,columnDefinition = "varchar(100) comment 'cron表达式'")
    private String cronExpression;
    @Column(name = "params",nullable = false,columnDefinition = "varchar(100) default '' comment '参数'")
    private String params;
    @Column(name = "job_status",nullable = false,columnDefinition = "int(1) default 0 comment '任务状态'")
    private Integer jobStatus;
    @CreatedDate
    @Column(name = "create_date",nullable = false,columnDefinition = "datetime default current_timestamp comment '创建时间'")
    private LocalDateTime createDate;
    @LastModifiedDate
    @Column(name = "update_date",nullable = false,columnDefinition = "datetime default current_timestamp on update current_timestamp comment '更新时间'")
    private LocalDateTime updateDate;


}

第一个我们要设计的是任务管理类,每一条记录就是一个定时任务,每个定时任务需要存储的是任务的名称用来标识我们的任务,由于我们后面会通过反射的方法来调用任务进行执行所以我们需要记录下任务的类名,方法名和所需要的参数。同时由于任务各自执行的周期不同,我们需要记录下任务的cron表达式来管理任务。每个任务可能正在运行也可能已经停止运行,我们需要用一个状态值来记录当前任务的状态。

@Data
@Entity
@Table(name = "self_job_log")
@org.hibernate.annotations.Table(appliesTo = "self_job_log",comment = "任务日志")
@EntityListeners(AuditingEntityListener.class)
public class SelfJobLogPO implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "log_id",nullable = false,columnDefinition = "bigint(20) comment '日志id'")
    private Long logId;
    @Column(name = "job_id",nullable = false,columnDefinition = "bigint(20) comment '任务id'")
    private Long jobId;
    @Column(name = "user_time",nullable = false,columnDefinition = "bigint(20) default 0 comment '耗时毫秒'")
    private Long userTime;
    @Column(name = "create_date",nullable = false,columnDefinition = "datetime default current_timestamp comment '开始执行时间'")
    private LocalDateTime createDate;
    @Column(name = "execute_status",nullable = false,columnDefinition = "int(11) default 1 comment '执行状态 0失败 1成功'")
    private Integer executeStatus;
    @Column(name = "error",columnDefinition = "varchar(500) default '' comment '错误'")
    private String error;
}

第二个要设计的类就是定时任务的日志类,日志类需要去记录任务每一次的执行情况。所以首先要保存任务的id,让它和任务相关联,另外要记录下任务执行是否成功,执行耗时了多久,如果任务执行式失败了,打印的错误堆栈是什么,这样便于我们通过日志来查找解决任务执行失败的问题。

完成两个类的创建之后,我们需要新建一个配置类来加载quartz的配置参数。

@Configuration
public class QuartzConfiguration {

    private static final String SCHEDULER_NAME = "Self_Scheduler";

    private static final String SCHEDULER_CONTEXT_KEY = "applicationContextKey";
    
    /**
     * @description 调度器工厂类
     * @author zhou
     * @create 2021/3/24 12:27 
     * @param 
     * @return org.springframework.scheduling.quartz.SchedulerFactoryBean
     **/
    @Bean("schedulerFactoryBean")
    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, Properties quartzProperties){
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        schedulerFactoryBean.setDataSource(dataSource);
        schedulerFactoryBean.setSchedulerName(SCHEDULER_NAME);
        schedulerFactoryBean.setQuartzProperties(quartzProperties);
        //延时30s
        schedulerFactoryBean.setStartupDelay(30);
        schedulerFactoryBean.setApplicationContextSchedulerContextKey(SCHEDULER_CONTEXT_KEY);
        return schedulerFactoryBean;
    }

    @Bean
    public Properties quartzProperties(){
        Properties properties = new Properties();
        //定时器实例名称
        properties.put("org.quartz.scheduler.instanceName","SelfQuartzScheduler");
        properties.put("org.quartz.scheduler.instanceId","AUTO");
        //线程池
        properties.put("org.quartz.threadPool.class","org.quartz.simpl.SimpleThreadPool");
        properties.put("org.quartz.threadPool.threadCount","20");
        properties.put("org.quartz.threadPool.threadPriority","5");
        //jobStore
        properties.put("org.quartz.jobStore.class","org.quartz.impl.jdbcjobstore.JobStoreTX");
        properties.put("org.quartz.jobStore.tablePrefix","QRTZ_");
        properties.put("org.quartz.jobStore.driverDelegateClass","org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
        properties.put("org.quartz.jobStore.misfireThreshold", "12000");
        return properties;
    }
}

主要的配置参数都可以在官网上找到。首先需要简单的认识一下quartz中的组件概念。quartz中最主要的有三个组件,分别是scheduler、jobdetail、trigger。

  1. scheduler是调度器,用于调度任务的执行,任务的详细信息和触发器都会注册在里面可以认为是个总指挥
  2. jobdetail这个是任务信息,主要是记录任务所需要携带的参数信息,可以认为是任务的载体
  3. trigger是触发器,任务何时会执行就依赖于触发器来控制。

首先我们注册一个工厂bean,由于我们需要将任务持久化到数据库所以要注入数据源参数。setOverwriteExistingJobs用于覆盖容器中原有的任务,setStartupDelay指的是在程序启动后延时30秒再开始进行任务的调度。有些项目中quartz的配置参数会通过一个properties文件来读取,这里我们直接编写在代码中。

设计定时任务

完成配置之后我们进行定时任务的设计,由于我们需要对任务进行动态管理,即能够修改任务周期,启动停止任务,所以我们首先是需要编写一些增删改查的业务代码,将这些对于我们之前自定义人任务类的操作暴露成接口,完成这一步之后,来编写设计一个任务的执行类。

@Slf4j
@Component
public class QuartzJob extends QuartzJobBean {



    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        SelfJobLogPO logPO = new SelfJobLogPO();
        long startTime = System.currentTimeMillis();
        logPO.setCreateDate(LocalDateTime.ofEpochSecond(startTime/1000,0, ZoneOffset.ofHours(0)));
        log.warn(logPO.getCreateDate().toString());
        SelfJobPO selfJobPO = null;
        LogService logService = SpringContextUtil.getBean(LogService.class);
        try {
            selfJobPO = (SelfJobPO) jobExecutionContext.getMergedJobDataMap().get(TackConstants.TASK_NAME);
            log.debug("定时任务开始执行,jobId:[{}]", selfJobPO.getJobId());
            this.execute(selfJobPO);
            logPO.setJobId(selfJobPO.getJobId());
            logPO.setExecuteStatus(ExecuteEnum.SUCCESS.getCode());
            logPO.setError(StringUtils.EMPTY);
        } catch (Exception e) {
            ErrorLogUtil.errorLog(e);
            log.error("定时任务执行失败,jobId:[{}]", selfJobPO.getJobId());
            logPO.setExecuteStatus(ExecuteEnum.FAIL.getCode());
            logPO.setError(StringUtils.substring(e.toString(), 0, 500));
        } finally {
            long useTime = System.currentTimeMillis() - startTime;
            log.debug("定时任务执行结束,jobId[{}],耗时:[{}]毫秒", selfJobPO.getJobId(), useTime);
            logPO.setUserTime(useTime);
            logService.add(logPO);
        }
    }

    /**
     * @param selfJobPO 定时任务模型
     * @return void
     * @description 反射执行定时任务方法
     * @author zhou
     * @create 2021/3/26 13:26
     **/
    private void execute(SelfJobPO selfJobPO) throws Exception {
        Object bean = SpringContextUtil.getBean(selfJobPO.getBeanName());
        Method method = bean.getClass().getDeclaredMethod(selfJobPO.getMethodName(), String.class);
        method.invoke(bean, selfJobPO.getParams());
    }
}

首先声明一个类并继承QuartzJobBean,将这个类注入到spring 容器中,这个主要是用于quartz job的具体实现。我对它的理解就是你可以把它认为是spring  aop的一个体现。比如对于系统运行的日志,我们经常会使用spring  aop的特性,在方法的前后进行日志打印。那么这个就是理解为任务执行的aop,每次任务的执行都会进入这个方法。它的入参我们可以理解是一个上下文对象。首先我们需要从上下文中将我们自定义的任务对象转化出来。getMergedJobDataMap这个方法将会获取一个类似map的数据结构,然后通过key获取我们这个线程中的定时任务。

当我们拿到自己的定时任务后,我们需要去执行调用这个定时任务。一般来说,定时任务的具体实现,是我们预先在我们的代码中编写了具体的业务代码,然后通过一个方法来调用它。那么在程序运行中我们如何去动态的执行一个方法呢?这个时候就需要用到我们的反射了。之前在我们自定义的任务类中我们记录了任务的bean名称,方法名和调用方法所需要的参数。

我们封装一个方法里面通过bean名称反射获取到执行方法的对象,然后通过方法名称去反射获取到具体的方法然后调用执行。这里就是一个简单的java反射执行。

另外我们需要记录这个任务的执行记录,所以在executeInternal中需要生成我们之前定义的日志记录类来保存日志记录。

到这里任务的执行已经有了统一的管理,我们还需要进行日志动态管理,我们需要再创建一个工具类

public class QuartzUtil {

    private static final String KEY = "TASK_";


    public static JobKey getJobKey(Long jobId,String group){
        return JobKey.jobKey(KEY+jobId,group);
    }

    public static TriggerKey getTriggerKey(Long jobId,String group){
        return TriggerKey.triggerKey(KEY+jobId,group);
    }

    public static Trigger getJobTrigger(Scheduler scheduler,Long jobId,String group) throws SchedulerException {
        return scheduler.getTrigger(getTriggerKey(jobId, group));
    }

    /**
     * @description 创建定时任务
     * @author zhou
     * @created  2021/4/18 21:50
     * @param scheduler 调度
     * @param selfJobPO 自定义任务信息
     * @return void
     **/
    public static void createJob(Scheduler scheduler, SelfJobPO selfJobPO){
        try{
            //job信息
            JobDetail jobDetail = JobBuilder.newJob(QuartzJob.class).withIdentity(getJobKey(selfJobPO.getJobId(), selfJobPO.getGroupName()))
                    .build();
            //cron
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(selfJobPO.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();
            //触发器
            CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(selfJobPO.getJobId(), selfJobPO.getGroupName()))
                    .withSchedule(cronScheduleBuilder).build();
            //存储信息
            jobDetail.getJobDataMap().put(TackConstants.TASK_NAME,selfJobPO);
            //调度器存储任务信息和触发器
            scheduler.scheduleJob(jobDetail,cronTrigger);
        }catch (SchedulerException e){
            ErrorLogUtil.errorLog(e);
            throw new ServiceErrorException(ServiceErrorEnum.ADD_JOB_ERROR);
        }
    }

    /** 
     * @description 更新定时任务 
     * @author zhou       
     * @created  2021/4/18 23:19
     * @param 
     * @return void
     **/
    public static void updateJob(Scheduler scheduler,SelfJobPO selfJobPO){

        try {
            //获取触发器key
            TriggerKey triggerKey = getTriggerKey(selfJobPO.getJobId(),selfJobPO.getGroupName());
            //重新构建cron
            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(selfJobPO.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();
            //获取原来的触发器
            CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if(cronTrigger.getCronExpression().equalsIgnoreCase(selfJobPO.getCronExpression())){
                return;
            }
            //更新触发器
            cronTrigger = cronTrigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
            //更新触发器中的调度信息
            cronTrigger.getJobDataMap().put(TackConstants.TASK_NAME,selfJobPO);
            //更新任务
            scheduler.rescheduleJob(triggerKey,cronTrigger);

        } catch (SchedulerException e) {
            ErrorLogUtil.errorLog(e);
            throw new ServiceErrorException(ServiceErrorEnum.MODIFY_JOB_ERROR);
        }

    }

    /**
     * @description 暂停定时任务
     * @author zhou
     * @create 2021/4/19 14:50
     * @param
     * @return void
     **/
    public static void pauseJob(Scheduler scheduler,SelfJobPO selfJobPO){
        try {
            scheduler.pauseJob(getJobKey(selfJobPO.getJobId(),selfJobPO.getGroupName()));
        } catch (SchedulerException e) {
            ErrorLogUtil.errorLog(e);
            throw new ServiceErrorException(ServiceErrorEnum.PAUSE_JOB_ERROR);
        }
    }

    /**
     * @description 恢复定时任务
     * @author zhou
     * @create 2021/4/19 14:56 
     * @param 
     * @return void
     **/
    public static void resumeJob(Scheduler scheduler,SelfJobPO selfJobPO){
        try {
            scheduler.resumeJob(getJobKey(selfJobPO.getJobId(),selfJobPO.getGroupName()));
        } catch (SchedulerException e) {
            ErrorLogUtil.errorLog(e);
            throw new ServiceErrorException(ServiceErrorEnum.RESUME_JOB_ERROR);
        }
    }

    /**
     * @description 删除定时任务
     * @author zhou
     * @create 2021/4/19 16:55
     * @param
     * @return void
     **/
    public static void deleteJob(Scheduler scheduler,SelfJobPO selfJobPO){
        try{
            scheduler.deleteJob(getJobKey(selfJobPO.getJobId(),selfJobPO.getGroupName()));
        }catch (SchedulerException e){
            ErrorLogUtil.errorLog(e);
            throw new ServiceErrorException(ServiceErrorEnum.DELETE_JOB_ERROR);
        }
    }

    /**
     * @description 立即执行定时任务
     * @author zhou
     * @create 2021/4/19 17:01
     * @param
     * @return void
     **/
    public static void execJob(Scheduler scheduler,SelfJobPO selfJobPO){
        try {
            JobDataMap jobDataMap = new JobDataMap();
            jobDataMap.put(TackConstants.TASK_NAME,selfJobPO);
            scheduler.triggerJob(getJobKey(selfJobPO.getJobId(),selfJobPO.getGroupName()),jobDataMap);
        } catch (SchedulerException e) {
            ErrorLogUtil.errorLog(e);
            throw new ServiceErrorException(ServiceErrorEnum.EXEC_JOB_ERROR);
        }
    }
}

这里设计了动态管理定时任务的常用方法,仔细来看其实结构还是比较清晰的,主要就是先构造了两个组件,一个是jobdetail任务的信息,另一个是trigger触发器,然后将这两个组件注册到scheduler调度器中。然后通过scheduler的自带的api进行调用。

编写之后我们将这些工具方法和我们管理任务的业务代码关联到一起,即在业务代码中调用这个工具类代码实现管理业务任务和quartz任务互通。

上一篇:Hadoop-之yarn容量调度器之多队列配置与解读


下一篇:Scheduler内核文档翻译(1)——Documentation\scheduler\sched-tune.txt