小小注解,背后也有大大的功能。众所周知,@Scheduled
一般用来标记计划任务执行的方法上,比如在用于服务器心跳检测,定时同步业务数据等的场景。
配置计划任务
配置计划任务执行线程池:
/**
* 异步执行任务线程池配置属性
*/
public class SchedulingProperties {
boolean removeOnCancelPolicy = Boolean.FALSE;
public boolean isRemoveOnCancelPolicy() { return removeOnCancelPolicy; }
public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) {
this.removeOnCancelPolicy = removeOnCancelPolicy;
}
/** 初始线程数. 原始值:1 默认设值 7 */
protected int poolSize = 5;
/** 最大线程数. 原始值: Integer.MAX_VALUE: 默认设值:42 */
protected int maxPoolSize = 50;
/** 队列等待数.原始值: Integer.MAX_VALUE: 默认设值:11 */
protected int queueCapacity = Integer.MAX_VALUE;
/** 超时秒数.原始值: 60: 默认设值:60 */
protected int keepAliveSeconds = 60;
/** 线程名前缀.原始值: 自动: 默认设值:SCHEDULED-
*
* @see org.springframework.util.CustomizableThreadCreator.getDefaultThreadNamePrefix() */
protected String threadNamePrefix = "SCHEDULED-";
/** 拒绝执行处理器类名.原始值:ThreadPoolExecutor.AbortPolicy 默认设值:ThreadPoolExecutor.AbortPolicy
*
* @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setRejectedExecutionHandler(RejectedExecutionHandler
* ) */
protected Class<? extends RejectedExecutionHandler> rejectedExecutionHandler = DiscardOldestPolicy.class;
/** 允许核心线程超时.原始值:false 默认设值:false */
protected boolean allowCoreThreadTimeOut = false;
/** @see {@link org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationSeconds(int)} */
protected int awaitTerminationSeconds = 0;
/** 设值 bean 名 原始值:null 默认设值:null */
protected String beanName;
/** 是否为守护线程.原始值:Boolean.FALSE 默认设值:Boolean.FALSE */
protected boolean daemon = Boolean.FALSE;
/** 线程组.原始值:null 默认设值:null */
protected String threadGroupName;
/** 优先级.取值在 1<x<10 之间.原始值:Thread.NORM_PRIORITY 默认设值:Thread.NORM_PRIORITY */
protected int threadPriority = Thread.NORM_PRIORITY;
/** 关闭时是否等待正在执行的任务都完成
*
* @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setWaitForTasksToCompleteOnShutdown(boolean) */
protected boolean waitForJobsToCompleteOnShutdown = Boolean.FALSE;
// ignore setter and getter
}
配置计划任务类
实现 SchedulingConfigurer
接口,至于为啥需实现,往后读自然明白。
@Configuration
@EnableScheduling
protected static class ScheduledConfiguration implements SchedulingConfigurer {
public static final String SCHEDULE_EXECUTOR = "schedule_executor";
//关于线程池的属性
@Autowired
private SchedulingProperties properties;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(taskScheduler());
}
@Bean(name = SCHEDULE_EXECUTOR, initMethod = "initialize", destroyMethod = "shutdown")
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(properties.getPoolSize());
taskScheduler.setRemoveOnCancelPolicy(false);
initExecutorConfigurationSupport(taskScheduler, properties);
return taskScheduler;
}
private static void initExecutorConfigurationSupport(ThreadPoolTaskScheduler executor, SchedulingProperties properties) {
Class<? extends RejectedExecutionHandler> rejectedExecutionHandler = properties.getRejectedExecutionHandler();
if (rejectedExecutionHandler != null)
executor.setRejectedExecutionHandler(BeanUtils.instantiate(rejectedExecutionHandler));
String threadNamePrefix = properties.getThreadNamePrefix();
if (hasText(threadNamePrefix))
executor.setThreadNamePrefix(threadNamePrefix);
String beanName = properties.getBeanName();
if (hasText(beanName))
executor.setBeanName(beanName);
String threadGroupName = properties.getThreadGroupName();
if (hasText(threadGroupName))
executor.setThreadGroupName(threadGroupName);
executor.setThreadPriority(properties.getThreadPriority());
executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds());
executor.setDaemon(properties.isDaemon());
executor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForJobsToCompleteOnShutdown());
}
}
使用@Scheduled
在需要执行的业务方法上注入 @Scheduled
:
//每 3s 执行一次此方法
@Scheduled(fixedRate = 1000 * 3)
public void updateTaskResultHandler() {
// 业务处理
...
}
查看源码中的 @Scheduled
:
/**
标记要调度的方法,fixedDelay 和 fixedRate 必须指定其中之一,带注解的方法的返回值是void ,如果不是,
则在调用时将忽略返回值。@ScheduledAnnotationBeanPostProcessor是通过注册
ScheduledAnnotationBeanPostProcessor来执行的。这可以手动完成,或者更方便的通过<task:annotaion-driven/> 元素
或使用 @EnableScheduling。
*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
/**
* A special cron expression value that indicates a disabled trigger: {@value}.
* <p>This is primarily meant for use with <code>${...}</code> placeholders,
* allowing for external disabling of corresponding scheduled methods.
* @since 5.1
* @see ScheduledTaskRegistrar#CRON_DISABLED
*/
String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
/**
一种类似于cron的表达式,扩展了通常的UN * X定义,使其包括秒,分钟,小时,每月的某天,某月和某天的触发器。
例如,“ 0 * * * * MON-FRI”表示工作日每分钟一次(在分钟的顶部-第0秒)。从左到右读取的字段解释如下。
second || minute || hour || day of month || month || day of week
*/
String cron() default "";
/**
将解决cron表达式的时区。 默认情况下,此属性为空字符串(即将使用服务器的本地时区)。
*/
String zone() default "";
/**
在上一次调用的结束与下一次调用的开始之间以固定的毫秒数为周期执行带注释的方法。
*/
long fixedDelay() default -1;
/**
* Execute the annotated method with a fixed period in milliseconds between the
* end of the last invocation and the start of the next.
* @return the delay in milliseconds as a String value, e.g. a placeholder
* or a {@link java.time.Duration#parse java.time.Duration} compliant value
* @since 3.2.2
*/
String fixedDelayString() default "";
/**
两次调用之间以固定的时间段(以毫秒为单位)执行带注释的方法。
*/
long fixedRate() default -1;
/**
* Execute the annotated method with a fixed period in milliseconds between
* invocations.
* @return the period in milliseconds as a String value, e.g. a placeholder
* or a {@link java.time.Duration#parse java.time.Duration} compliant value
* @since 3.2.2
*/
String fixedRateString() default "";
/**
在首次执行fixedRate或fixedDelay任务之前要延迟的毫秒数。
*/
long initialDelay() default -1;
/**
* Number of milliseconds to delay before the first execution of a
* {@link #fixedRate} or {@link #fixedDelay} task.
* @return the initial delay in milliseconds as a String value, e.g. a placeholder
* or a {@link java.time.Duration#parse java.time.Duration} compliant value
* @since 3.2.2
*/
String initialDelayString() default "";
}
源码解析
上图为 ScheduledAnnotationBeanPostProcessor
的UML 图,可以发现 ScheduledAnnotationBeanPostProcessor
间接实现了 BeanPostProcessor
,而熟悉Spring 的童鞋应该都知道,这个类最厉(sai)害(lei)的两个方法是:
// 在Bean初始化之前调用
Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
//Bean 初始化后调用
Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
同样可以看到,ScheduledAnnotationBeanPostProcessor
中重写了 初始化后调用的方法:
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
// 查找类中使用了 @Scheduled 或 @Schedules 的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods(如果有满足条件的类存在,则调用 processScheduled() )
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
/**
* Process the given {@code @Scheduled} method declaration on the given bean.
* @param scheduled the @Scheduled annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
//1. 创建一个可执行任务
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// 2. 满足条件,调用 this.registrar.scheduleFixedRateTask()
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
debug 进入this.registrar.scheduleFixedRateTask(...)
:
public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
}
}
else {
addFixedRateTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
最后就是将计划任务放到线程池中(ThreadPoolTaskScheduler
类),等待执行:
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
查看 ThreadPoolTaskScheduler
类的UML 图,不难发现,其实这是一个线程池。
实为前面我们配置的计划任务中的线程池:ScheduledConfiguration.taskScheduler()
再看回最初的类:ScheduledAnnotationBeanPostProcessor
因实现了 ApplicationListener 而重写了 onApplicationEvent(ContextRefreshedEvent event)
方法:
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory) {
// 查找 SchedulingConfigurer 的实现类
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
//遍历 SchedulingConfigurer 的实现类,分别调用 configureTasks(), 这就是为什么,我们前面的
// ScheduledConfiguration 中需要 实现 SchedulingConfigurer,而重写 configureTasks()了 。
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
//...
}
catch (NoSuchBeanDefinitionException ex) {
//...
}
}
this.registrar.afterPropertiesSet();
}
方法的最后调用:this.registrar.afterPropertiesSet();
进入查看 类(ScheduledTaskRegistrar
):
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
private void addScheduledTask(@Nullable ScheduledTask task) {
if (task != null) {
this.scheduledTasks.add(task);
}
}
整体执行流程如下: