springboot2.x玩转quartz

介绍springboot2.x整合quartz,并给出一个基本的jdbc持久化的cron示例。

maven依赖:

        <!--spring boot集成quartz-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
         <!--quartz依赖c3p0作为jdbc持久化数据库连接池-->
        <dependency>
            <groupId>com.mchange</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.5.2</version>
        </dependency>

springboot配置yaml:

#数据源配置
spring:
  quartz:
    #相关属性配置
    properties:
      org:
        quartz:
          dataSource:
            quartzDS:
              driver: com.mysql.cj.jdbc.Driver
              URL: jdbc:mysql://192.168.1.105:3306/smart-bar-dev?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true
              user: root
              password: 你的密码
          scheduler:
            instanceName: clusteredScheduler
            instanceId: AUTO
          jobStore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            tablePrefix: QRTZ_
            useProperties: false
            dataSource: quartzDS
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 10
            threadPriority: 5
            threadsInheritContextClassLoaderOfInitializingThread: true
      #数据库方式
      job-store-type: jdbc
        #初始化表结构
        jdbc:
          initialize-schema: always #每次启动都初始化数据库,3种模式(ALWAYS、EMBEDDED、NEVER)

一切顺利,你启动之后可以看到数据库表已经生成(initialize-schema: never此时可以关闭了):
springboot2.x玩转quartz
接下来开始写代码cron示例:
JobService是job的基本操作,代码:

import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springblade.modules.quartz.job.DateTimeJob;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author hsj
 * @description:job的基本操作
 * @date 2021/9/17 18:36
 */
@Component
@Slf4j
public class JobService {

	//加入Qulifier注解,通过名称注入bean
	@Resource
	private Scheduler schedulerQuartz;
	//GROUP组
	private final static String TRIGGER_IDENTITY = "QuartzTaskServiceTrigger";

	/**
	 * @Description:添加job
	 * @author HeShengjin 2356899074@qq.com
	 * @date 2021/9/17 18:38
	 */
	//CronTrigger
	public void addCronJob(String msg, String cron,String uniqueNameId) throws Exception {

		// 启动调度器
		schedulerQuartz.start();

		//构建job信息
		JobDetail jobDetail = JobBuilder.newJob(DateTimeJob.class)//PrintTimeJob我们的业务类
			.withIdentity(uniqueNameId,TRIGGER_IDENTITY)
			//每个JobDetail内都有一个Map,包含了关联到这个Job的数据,在Job类中可以通过context获取
			.usingJobData("msg", msg)//关联键值对
			.storeDurably()//即使没有Trigger关联时,也不需要删除该JobDetail
			.build();

		//表达式调度构建器(即任务执行的时间)
		//按新的cronExpression表达式构建一个新的trigger
		CronTrigger trigger = TriggerBuilder.newTrigger()
			.forJob(jobDetail)//关联上述的JobDetail
			.withIdentity(uniqueNameId,TRIGGER_IDENTITY)
			.withSchedule(CronScheduleBuilder.cronSchedule(cron))// "0/1 * * * * ?"
			.build();

		try {
			schedulerQuartz.scheduleJob(jobDetail, trigger);

		} catch (SchedulerException e) {
			log.info("创建定时任务失败" + e);
			throw new Exception("创建定时任务失败");
		}
	}

	/**
	 * @Description:暂停job
	 * @author HeShengjin 2356899074@qq.com
	 * @date 2021/9/17 18:38
	 */
	public void jobPause(String uniqueNameId) throws Exception {
		schedulerQuartz.pauseJob(JobKey.jobKey(uniqueNameId,TRIGGER_IDENTITY));
	}

	/**
	 * @Description:恢复job
	 * @author HeShengjin 2356899074@qq.com
	 * @date 2021/9/17 18:38
	 */
	public void jobresume(String uniqueNameId) throws Exception {
		schedulerQuartz.resumeJob(JobKey.jobKey(uniqueNameId,TRIGGER_IDENTITY));
	}


	/**
	 * @Description:更新job
	 * @author HeShengjin 2356899074@qq.com
	 * @date 2021/9/17 18:38
	 */
	public void jobreschedule(String cron,String uniqueNameId) throws Exception {
		try {
			TriggerKey triggerKey = TriggerKey.triggerKey(uniqueNameId,TRIGGER_IDENTITY);
			// 表达式调度构建器
			CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cron);

			CronTrigger trigger = (CronTrigger) schedulerQuartz.getTrigger(triggerKey);

			// 按新的cronExpression表达式重新构建trigger
			trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();

			// 按新的trigger重新设置job执行
			schedulerQuartz.rescheduleJob(triggerKey, trigger);
		} catch (SchedulerException e) {
			log.info("更新定时任务失败" + e);
			throw new Exception("更新定时任务失败");
		}
	}


	/**
	 * @Description:删除job
	 * @author HeShengjin 2356899074@qq.com
	 * @date 2021/9/17 18:39
	 */
	public void jobdelete(String uniqueNameId) throws Exception {
		schedulerQuartz.pauseTrigger(TriggerKey.triggerKey(uniqueNameId,TRIGGER_IDENTITY));
		schedulerQuartz.unscheduleJob(TriggerKey.triggerKey(uniqueNameId,TRIGGER_IDENTITY));
		schedulerQuartz.deleteJob(JobKey.jobKey(uniqueNameId,TRIGGER_IDENTITY));
	}

}

JobController接口:


import lombok.extern.slf4j.Slf4j;
import org.springblade.modules.quartz.service.JobService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author hsj
 * @description:
 * @date 2021/9/17 17:39
 */
@Slf4j
@RestController
@RequestMapping(value = "blade-quartz-job")
public class JobController {

	@Resource
	private JobService jobService;
	/**
	 * 添加任务
	 *
	 * @throws Exception
	 */
	@PostMapping(value = "/addjob")
	public void addjob(@RequestParam(value = "msg") String msg, @RequestParam(value = "cron") String cron,@RequestParam(value = "uniqueNameId") String uniqueNameId) throws Exception {
		jobService.addCronJob(msg, cron ,uniqueNameId);
	}
	/**
	 * 暂停任务
	 *
	 * @throws Exception
	 */
	@PostMapping(value = "/pausejob")
	public void pausejob(@RequestParam(value = "uniqueNameId") String uniqueNameId) throws Exception {
		jobService.jobPause(uniqueNameId);
	}
	/**
	 * 恢复任务
	 * @throws Exception
	 */
	@PostMapping(value = "/resumejob")
	public void resumejob(@RequestParam(value = "uniqueNameId") String uniqueNameId) throws Exception {
		jobService.jobresume(uniqueNameId);
	}
	/**
	 * 更新任务
	 *
	 * @param cron
	 * @throws Exception
	 */
	@PostMapping(value = "/reschedulejob")
	public void rescheduleJob(@RequestParam(value = "cron") String cron,@RequestParam(value = "uniqueNameId") String uniqueNameId) throws Exception {
		jobService.jobreschedule(cron,uniqueNameId);
	}
	/**
	 * 删除任务
	 * 删除操作前应该暂停该任务的触发器,并且停止该任务的执行
	 *
	 * @throws Exception
	 */
	@PostMapping(value = "/deletejob")
	public void deletejob(@RequestParam(value = "uniqueNameId") String uniqueNameId) throws Exception {
		jobService.jobdelete(uniqueNameId);
	}

}



基本的服务有了,下面开始示例(一个定时控制继电器操作demo):
JobMessage,业务对象


import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author hsj
 * @description:
 * @date 2021/9/18 9:14
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class JobMessage {
	//0表示区间需要通电,1表示区间需要断电
	private int type;
	//0表示开始时间,1表示结束时间
	private int timeType;
	//设备型号:host_model
	private int hostModel;
	//设备id->deviceId
	private long deviceId;
	//继电器0~8
	private int electricRelay;

	@Override
	public String toString() {
		return JSON.toJSONString(this);
	}

	public static JobMessage parseFromString(String jobMessage){
		return JSON.parseObject(jobMessage,JobMessage.class);
	}
}

JobDetail相关的业务代码:


import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springblade.common.constant.CommonConstant;
import org.springblade.common.utils.SpringUtils;
import org.springblade.modules.quartz.msg.JobMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.QuartzJobBean;

/**
 * @author hsj
 * @description:JobDetail相关的业务代码
 * @date 2021/9/17 17:45
 */
@Slf4j
public class DateTimeJob extends QuartzJobBean {

	/**
	 * @Description:定制执行器执行时间点job
	 * @author HeShengjin 2356899074@qq.com
	 * @date 2021/9/18 10:45
	 */
	@Override
	protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
		ThreadPoolTaskExecutor threadPoolTaskExecutor = SpringUtils.getBean(ThreadPoolTaskExecutor.class);
		//不要阻塞quartz线程池执行后面的任务
		threadPoolTaskExecutor.execute(()->{
			//获取JobDetail中关联的数据
			String jobMessageStr = (String) jobExecutionContext.getJobDetail().getJobDataMap().get("msg");
			JobMessage jobMessage = JobMessage.parseFromString(jobMessageStr);
			log.info("获取JobDetail中关联的数据:{}",jobMessage.toString());
            //执行业务逻辑
			doBussiness(jobMessage);
		});
	}

	/**
	 * @Description:执行业务逻辑
	 * @author HeShengjin 2356899074@qq.com
	 * @date 2021/9/18 10:44
	 */
	private void doBussiness(JobMessage jobMessage) {
		//区间需要通电,start就是通电操作,end就是断电操作
		if (CommonConstant.JobMessage.TYPE_0 == jobMessage.getType()){
			//start控制主机继电器通电
			if(CommonConstant.JobMessage.TIMETYPE_0 == jobMessage.getTimeType()){
				log.info("区间需要通电,start就是通电操作,end就是断电操作-->start控制主机继电器通电,deviceId={},hostModel={},electricRelay={}"
					, jobMessage.getDeviceId(), jobMessage.getHostModel(), jobMessage.getElectricRelay());
				//TODO
			}
			//end控制主机继电器断电
			else{
				log.info("区间需要通电,start就是通电操作,end就是断电操作-->end控制主机继电器断电,deviceId={},hostModel={},electricRelay={}"
					, jobMessage.getDeviceId(), jobMessage.getHostModel(), jobMessage.getElectricRelay());
				//TODO
			}
		}
		//区间需要断电,start就是断电操作,end就是通电操作
		else {
			//start控制主机继电器断电
			if(CommonConstant.JobMessage.TIMETYPE_0 == jobMessage.getTimeType()){
				log.info("区间需要断电,start就是断电操作,end就是通电操作-->start控制主机继电器断电,deviceId={},hostModel={},electricRelay={}"
					, jobMessage.getDeviceId(), jobMessage.getHostModel(), jobMessage.getElectricRelay());
				//TODO
			}
			//end控制主机继电器通电
			else{
				log.info("区间需要断电,start就是断电操作,end就是通电操作-->end控制主机继电器通电,deviceId={},hostModel={},electricRelay={}"
					, jobMessage.getDeviceId(), jobMessage.getHostModel(), jobMessage.getElectricRelay());
				//TODO
			}
		}
	}
}

测试一下:


import lombok.extern.slf4j.Slf4j;
import org.springblade.api.DeviceCtl;
import org.springblade.api.nettyadapter.HostModel;
import org.springblade.common.constant.CommonConstant;
import org.springblade.common.utils.CronExpressionUtils;
import org.springblade.modules.quartz.msg.JobMessage;
import org.springblade.modules.quartz.service.JobService;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author hsj
 * @description:测试quartz启动执行
 * @date 2021/9/17 10:53
 */
@Slf4j
@Component
public class MyCommand  implements CommandLineRunner {

	@Resource
	private JobService jobService;

	@Override
	public void run(String... args) throws Exception {
		//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~实例说明前提:区间需要通电~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
		//当添加一条规则(规则有数据库唯一主键ID:1123598821738675201):比如每天10:02:02~18:05:50之间需要通电
		
		//其实是说: 每天10:02:02通电(继电器控制)
		//          每天18:05:50断电(继电器控制)

		String uniqueNameIdStart = "1123598821738675201-start";//数据库唯一主键ID-start保证唯一
		String uniqueNameIdEnd = "1123598821738675201-end";//数据库唯一主键ID-end保证唯一
		//每天10点02分02秒执行一次任务
		//"02 02 10 * * ?"
		String cronStart = CronExpressionUtils.conversionHmsCron("10:02:02");
		//每天18点05分50秒执行一次任务
		//"50 05 18 * * ?"
		String cronEnd = CronExpressionUtils.conversionHmsCron("18:05:50");

		//添加执行任务
		//每天10:02:02通电(继电器控制)...
		jobService.addCronJob(
			JobMessage.builder()
				.type(CommonConstant.JobMessage.TYPE_0)//区间需要通电
				.timeType(CommonConstant.JobMessage.TIMETYPE_0)//开始时间
				.hostModel(HostModel.YOUREN_IO_444)//实际数据根据deviceId从主机表blade_iot_device_host.host_model获取
			    .deviceId(1437253753398898700L)//实际的deviceId
			    .electricRelay(DeviceCtl.Number.number0)//第一个继电器
				.build()
			    .toString(),
			cronStart,
			uniqueNameIdStart);
		//添加执行任务
		//每天18:05:50断电(继电器控制)...
		jobService.addCronJob(
			JobMessage.builder()
				.type(CommonConstant.JobMessage.TYPE_0)//区间需要通电
				.timeType(CommonConstant.JobMessage.TIMETYPE_1)//结束时间
				.hostModel(HostModel.YOUREN_IO_444)//实际数据根据deviceId从主机表blade_iot_device_host.host_model获取
				.deviceId(1437253753398898700L)//实际的deviceId
				.electricRelay(DeviceCtl.Number.number0)//第一个继电器
				.build()
				.toString(),
			cronEnd,
			uniqueNameIdEnd);

		//根据uniqueNameId暂停任务
//		jobService.jobPause(uniqueNameIdStart);
//		jobService.jobPause(uniqueNameIdEnd);

		//根据uniqueNameId恢复已暂停任务
//		jobService.jobresume(uniqueNameIdStart);
//		jobService.jobresume(uniqueNameIdEnd);

		//根据uniqueNameId更新任务执行时间cron->cronNew
		//更新到每天11:02:02通电(继电器控制)
//		String cronStartNew = CronExpressionUtils.conversionHmsCron("11:02:02");
//		jobService.jobreschedule(cronStartNew,uniqueNameIdStart);

		//根据uniqueNameId删除任务(规则删除时候请务必删除定时任务)
//		jobService.jobdelete(uniqueNameIdStart);
//		jobService.jobdelete(uniqueNameIdEnd);
	}
}

附录:
ThreadPoolTaskExecutor是spring框架的线程池
SpringUtils是一个工具,获取spring容器Bean对象。


import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;

/**
 * spring工具类 方便在非spring管理环境中获取bean
 * @author Dylan
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor {
    /**
     * Spring应用上下文环境
     */
    private static ConfigurableListableBeanFactory beanFactory;

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringUtils.beanFactory = beanFactory;
    }

    /**
     * 获取对象
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws BeansException
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     * @param clz
     * @return
     * @throws BeansException
     */
    public static <T> T getBean(Class<T> clz) throws BeansException {
        return beanFactory.getBean(clz);
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return beanFactory.containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     * @param name
     * @return boolean
     * @throws NoSuchBeanDefinitionException
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws NoSuchBeanDefinitionException
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     * @param name
     * @return
     * @throws NoSuchBeanDefinitionException
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getAliases(name);
    }

    /**
     * 获取aop代理对象
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker) {
        return (T) AopContext.currentProxy();
    }
}

上一篇:书城项目第八阶段:使用Filter过滤器实现后台的权限管理


下一篇:异常的使用方法(throw和throws)