quartz的实现流程

需要导入的pom

<!-- quartz -->
<dependency>
   <groupId>org.quartz-scheduler</groupId>
   <artifactId>quartz</artifactId>
</dependency>

 

我想达到这样的效果

public class ExecutionJobTask implements Job {
    @Autowired
    private QuartzJobService quartzJobService;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
//执行我的方法
quartzJobService.xxx(); ...... } }

 

但是Job对象的实例化过程是在Quartz中进行的,AppOrderService是在Spring容器当中的。如何将他们关联到一起呢?Quartz提供了JobFactory接口,让我们可以自定义实现创建Job的逻辑。

public interface JobFactory {
    Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException;
}

 

那么我们通过实现JobFactory 接口,在实例化Job以后,在通过ApplicationContext 将Job所需要的属性注入即可
在Spring与Quartz集成时,用到的是org.springframework.scheduling.quartz.SchedulerFactoryBean这个类。
源码如下,我们只看红色最关键的代码

private Scheduler prepareScheduler(SchedulerFactory schedulerFactory) throws SchedulerException {
        if (this.resourceLoader != null) {
            // Make given ResourceLoader available for SchedulerFactory configuration.
            configTimeResourceLoaderHolder.set(this.resourceLoader);
        }
        if (this.taskExecutor != null) {
            // Make given TaskExecutor available for SchedulerFactory configuration.
            configTimeTaskExecutorHolder.set(this.taskExecutor);
        }
        if (this.dataSource != null) {
            // Make given DataSource available for SchedulerFactory configuration.
            configTimeDataSourceHolder.set(this.dataSource);
        }
        if (this.nonTransactionalDataSource != null) {
            // Make given non-transactional DataSource available for SchedulerFactory configuration.
            configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
        }

        // Get Scheduler instance from SchedulerFactory.
        try {
            Scheduler scheduler = createScheduler(schedulerFactory, this.schedulerName);
            populateSchedulerContext(scheduler);

            if (!this.jobFactorySet && !(scheduler instanceof RemoteScheduler)) {
                // Use AdaptableJobFactory as default for a local Scheduler, unless when
                // explicitly given a null value through the "jobFactory" bean property.
                this.jobFactory = new AdaptableJobFactory();
            }
            if (this.jobFactory != null) {
                if (this.applicationContext != null && this.jobFactory instanceof ApplicationContextAware) {
                    ((ApplicationContextAware) this.jobFactory).setApplicationContext(this.applicationContext);
                }
                if (this.jobFactory instanceof SchedulerContextAware) {
                    ((SchedulerContextAware) this.jobFactory).setSchedulerContext(scheduler.getContext());
                }
                scheduler.setJobFactory(this.jobFactory);
            }
            return scheduler;
        }

        finally {
            if (this.resourceLoader != null) {
                configTimeResourceLoaderHolder.remove();
            }
            if (this.taskExecutor != null) {
                configTimeTaskExecutorHolder.remove();
            }
            if (this.dataSource != null) {
                configTimeDataSourceHolder.remove();
            }
            if (this.nonTransactionalDataSource != null) {
                configTimeNonTransactionalDataSourceHolder.remove();
            }
        }
    }

意思是说:如果我们不指定jobFactory,那么Spring就使用AdaptableJobFactory。

再来看一下AdaptableJobFactory这个类的实现

package org.springframework.scheduling.quartz;

import org.quartz.Job;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.util.ReflectionUtils;

public class AdaptableJobFactory implements JobFactory {
    @Override
    public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
        try {
            Object jobObject = createJobInstance(bundle);
            return adaptJob(jobObject);
        }
        catch (Throwable ex) {
            throw new SchedulerException("Job instantiation failed", ex);
        }
    }

    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Class<?> jobClass = bundle.getJobDetail().getJobClass();
        return ReflectionUtils.accessibleConstructor(jobClass).newInstance();
    }

    protected Job adaptJob(Object jobObject) throws Exception {
        if (jobObject instanceof Job) {
            return (Job) jobObject;
        }
        else if (jobObject instanceof Runnable) {
            return new DelegatingJob((Runnable) jobObject);
        }
        else {
            throw new IllegalArgumentException(
                    "Unable to execute job class [" + jobObject.getClass().getName() +
                    "]: only [org.quartz.Job] and [java.lang.Runnable] supported.");
        }
    }
}

可以看见createJobInstance方法实例化了JobClass,但我想把JobClass注入到spring中。

所以需要写一个类继承AdaptableJobFactory,然后重写createJobInstance方法进行对Job的注入。

    @Component("quartzJobFactory")
    public static class QuartzJobFactory extends AdaptableJobFactory {

        private final AutowireCapableBeanFactory capableBeanFactory;

        public QuartzJobFactory(AutowireCapableBeanFactory capableBeanFactory) {
            this.capableBeanFactory = capableBeanFactory;
        }

        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            //调用父类的方法,把Job注入到spring中
            Object jobInstance = super.createJobInstance(bundle);
            capableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }

 

接下来注入我自己的Job到JobFactory里

    /**
     * 注入scheduler到spring
     *
     * @param quartzJobFactory /
     * @return Scheduler
     * @throws Exception /
     */
    @Bean(name = "scheduler")
    public Scheduler scheduler(QuartzJobFactory quartzJobFactory) throws Exception {
        SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();
        factoryBean.setJobFactory(quartzJobFactory);
        factoryBean.afterPropertiesSet();
        Scheduler scheduler = factoryBean.getScheduler();
        scheduler.start();
        return scheduler;
    }

这样就完成了Job注入到spring中的功能,其实原理就是扩展JobFactory创建job的方法,在创建完Job以后进行属性注入。

 

上一篇:Quartz解决无法注入bean问题,以及选用自己定义构造器实例化


下一篇:腾讯云 EMR 基于 YARN 针对云原生容器化的优化与实践