4.6、重新设置 Quartz 数据连接池
默认 Quartz 的数据连接池是 c3p0,由于性能不太稳定,不推荐使用,因此我们将其改成driud
数据连接池,配置如下:
public class DruidConnectionProvider implements ConnectionProvider { /** * 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。 * @return * @throws SQLException */ //JDBC驱动 public String driver; //JDBC连接串 public String URL; //数据库用户名 public String user; //数据库用户密码 public String password; //数据库最大连接数 public int maxConnection; //数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。 public String validationQuery; private boolean validateOnCheckout; private int idleConnectionValidationSeconds; public String maxCachedStatementsPerConnection; private String discardIdleConnectionsSeconds; public static final int DEFAULT_DB_MAX_CONNECTIONS = 10; public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120; //Druid连接池 private DruidDataSource datasource; @Override public Connection getConnection() throws SQLException { return datasource.getConnection(); } @Override public void shutdown() throws SQLException { datasource.close(); } @Override public void initialize() throws SQLException { if (this.URL == null) { throw new SQLException("DBPool could not be created: DB URL cannot be null"); } if (this.driver == null) { throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!"); } if (this.maxConnection < 0) { throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!"); } datasource = new DruidDataSource(); try{ datasource.setDriverClassName(this.driver); } catch (Exception e) { try { throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e); } catch (SchedulerException e1) { } } datasource.setUrl(this.URL); datasource.setUsername(this.user); datasource.setPassword(this.password); datasource.setMaxActive(this.maxConnection); datasource.setMinIdle(1); datasource.setMaxWait(0); datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS); if (this.validationQuery != null) { datasource.setValidationQuery(this.validationQuery); if(!this.validateOnCheckout) datasource.setTestOnReturn(true); else datasource.setTestOnBorrow(true); datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds); } } public String getDriver() { return driver; } public void setDriver(String driver) { this.driver = driver; } public String getURL() { return URL; } public void setURL(String URL) { this.URL = URL; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getMaxConnection() { return maxConnection; } public void setMaxConnection(int maxConnection) { this.maxConnection = maxConnection; } public String getValidationQuery() { return validationQuery; } public void setValidationQuery(String validationQuery) { this.validationQuery = validationQuery; } public boolean isValidateOnCheckout() { return validateOnCheckout; } public void setValidateOnCheckout(boolean validateOnCheckout) { this.validateOnCheckout = validateOnCheckout; } public int getIdleConnectionValidationSeconds() { return idleConnectionValidationSeconds; } public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) { this.idleConnectionValidationSeconds = idleConnectionValidationSeconds; } public DruidDataSource getDatasource() { return datasource; } public void setDatasource(DruidDataSource datasource) { this.datasource = datasource; } public String getDiscardIdleConnectionsSeconds() { return discardIdleConnectionsSeconds; } public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) { this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds; } }
创建完成之后,还需要在quartz.properties
配置文件中设置一下即可!
#数据库连接池,将其设置为druid org.quartz.dataSource.qzDS.connectionProvider.class=com.example.cluster.quartz.config.DruidConnectionProvider
如果已经配置,请忽略!
4.7、编写 Job 具体任务类
public class TfCommandJob implements Job { private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class); @Override public void execute(JobExecutionContext context) { try { System.out.println(context.getScheduler().getSchedulerInstanceId() + "--" + new SimpleDateFormat("YYYY-MM-dd HH:mm:ss").format(new Date())); } catch (SchedulerException e) { log.error("任务执行失败",e); } } }
4.8、编写 Quartz 服务层接口
public interface QuartzJobService { /** * 添加任务可以传参数 * @param clazzName * @param jobName * @param groupName * @param cronExp * @param param */ void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param); /** * 暂停任务 * @param jobName * @param groupName */ void pauseJob(String jobName, String groupName); /** * 恢复任务 * @param jobName * @param groupName */ void resumeJob(String jobName, String groupName); /** * 立即运行一次定时任务 * @param jobName * @param groupName */ void runOnce(String jobName, String groupName); /** * 更新任务 * @param jobName * @param groupName * @param cronExp * @param param */ void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param); /** * 删除任务 * @param jobName * @param groupName */ void deleteJob(String jobName, String groupName); /** * 启动所有任务 */ void startAllJobs(); /** * 暂停所有任务 */ void pauseAllJobs(); /** * 恢复所有任务 */ void resumeAllJobs(); /** * 关闭所有任务 */ void shutdownAllJobs(); }
对应的实现类QuartzJobServiceImpl
如下:
@Service public class QuartzJobServiceImpl implements QuartzJobService { private static final Logger log = LoggerFactory.getLogger(QuartzJobServiceImpl.class); @Autowired private Scheduler scheduler; @Override public void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) { try { // 启动调度器,默认初始化的时候已经启动 // scheduler.start(); //构建job信息 Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(clazzName); JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build(); //表达式调度构建器(即任务执行的时间) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp); //按新的cronExpression表达式构建一个新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build(); //获得JobDataMap,写入数据 if (param != null) { trigger.getJobDataMap().putAll(param); } scheduler.scheduleJob(jobDetail, trigger); } catch (Exception e) { log.error("创建任务失败", e); } } @Override public void pauseJob(String jobName, String groupName) { try { scheduler.pauseJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("暂停任务失败", e); } } @Override public void resumeJob(String jobName, String groupName) { try { scheduler.resumeJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("恢复任务失败", e); } } @Override public void runOnce(String jobName, String groupName) { try { scheduler.triggerJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("立即运行一次定时任务失败", e); } } @Override public void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (cronExp != null) { // 表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp); // 按新的cronExpression表达式重新构建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); } //修改map if (param != null) { trigger.getJobDataMap().putAll(param); } // 按新的trigger重新设置job执行 scheduler.rescheduleJob(triggerKey, trigger); } catch (Exception e) { log.error("更新任务失败", e); } } @Override public void deleteJob(String jobName, String groupName) { try { //暂停、移除、删除 scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName)); scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName)); scheduler.deleteJob(JobKey.jobKey(jobName, groupName)); } catch (Exception e) { log.error("删除任务失败", e); } } @Override public void startAllJobs() { try { scheduler.start(); } catch (Exception e) { log.error("开启所有的任务失败", e); } } @Override public void pauseAllJobs() { try { scheduler.pauseAll(); } catch (Exception e) { log.error("暂停所有任务失败", e); } } @Override public void resumeAllJobs() { try { scheduler.resumeAll(); } catch (Exception e) { log.error("恢复所有任务失败", e); } } @Override public void shutdownAllJobs() { try { if (!scheduler.isShutdown()) { // 需谨慎操作关闭scheduler容器 // scheduler生命周期结束,无法再 start() 启动scheduler scheduler.shutdown(true); } } catch (Exception e) { log.error("关闭所有的任务失败", e); } } }