1.DisallowConcurrentExceution
从字面意思来看也就是不允许并发执行
简单的演示一下
[DisallowConcurrentExecution] public class TestDisallowConcurrentExectionJob : IJob { public async Task Execute(IJobExecutionContext context) { await Task.Run(() => { var nextTime = context.NextFireTimeUtc?.ToLocalTime(); var currentTime = DateTime.Now; Console.WriteLine($"CurrentTime={currentTime}, FireTime={context.ScheduledFireTimeUtc?.ToLocalTime()}, NextTime={nextTime}"); }); Thread.Sleep(10000); } } public class TestDisallowConcurrentExectionScheduler { public async static Task Test() { var scheduler = await StdSchedulerFactory.GetDefaultScheduler(); await scheduler.Start(); var job = JobBuilder.CreateForAsync<TestDisallowConcurrentExectionJob>() .WithIdentity("TestDisallowConcurrentExectionJob") .UsingJobData("myKey", "myValue") .Build(); var trigger = TriggerBuilder.Create() .WithSimpleSchedule(s => s.WithIntervalInSeconds(1) .RepeatForever()) .Build(); await scheduler.ScheduleJob(job, trigger); } }
未添加特性的结果
添加特性的结果
Quartz默认是十个线程工作线程加一个调度线程,当线程在等待时,其他工作线程也会进来,而加上DiallowConcurrentExection特性时则都会处于等待状态
原理图
源码解析
通过QuartzSchedulerThread中的Run方法调用AcquireNextTriggers获取下一次的触发器
public virtual Task<IReadOnlyCollection<IOperableTrigger>> AcquireNextTriggers( DateTimeOffset noLaterThan, int maxCount, TimeSpan timeWindow, CancellationToken cancellationToken = default) { lock (lockObject) { var result = new List<IOperableTrigger>(); // return empty list if store has no triggers. if (timeTriggers.Count == 0) { return Task.FromResult<IReadOnlyCollection<IOperableTrigger>>(result); } var acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); var excludedTriggers = new HashSet<TriggerWrapper>(); DateTimeOffset batchEnd = noLaterThan; while (true) { var tw = timeTriggers.FirstOrDefault(); if (tw == null) { break; } if (!timeTriggers.Remove(tw)) { break; } if (tw.Trigger.GetNextFireTimeUtc() == null) { continue; } if (ApplyMisfire(tw)) { if (tw.Trigger.GetNextFireTimeUtc() != null) { timeTriggers.Add(tw); } continue; } if (tw.Trigger.GetNextFireTimeUtc() > batchEnd) { timeTriggers.Add(tw); break; } // If trigger‘s job is set as @DisallowConcurrentExecution, and it has already been added to result, then // put it back into the timeTriggers set and continue to search for next trigger. JobKey jobKey = tw.Trigger.JobKey; IJobDetail job = jobsByKey[jobKey].JobDetail; if (job.ConcurrentExecutionDisallowed) { if (!acquiredJobKeysForNoConcurrentExec.Add(jobKey)) { excludedTriggers.Add(tw); continue; // go to next trigger in store. } } tw.state = InternalTriggerState.Acquired; tw.Trigger.FireInstanceId = GetFiredTriggerRecordId(); IOperableTrigger trig = (IOperableTrigger) tw.Trigger.Clone(); if (result.Count == 0) { var now = SystemTime.UtcNow(); var nextFireTime = tw.Trigger.GetNextFireTimeUtc().GetValueOrDefault(DateTimeOffset.MinValue); var max = now > nextFireTime ? now : nextFireTime; batchEnd = max + timeWindow; } result.Add(trig); if (result.Count == maxCount) { break; } } // If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store. if (excludedTriggers.Count > 0) { foreach (var excludedTrigger in excludedTriggers) { timeTriggers.Add(excludedTrigger); } } return Task.FromResult<IReadOnlyCollection<IOperableTrigger>>(result); } }
RAMJobStore中的TriggersFired方法
当添加了DisallowConcurrentExecution特性后
先从TimeTriggers中移除Trigger
再把Job添加BlockedJobs中
if (job.ConcurrentExecutionDisallowed) { IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(job.Key); foreach (TriggerWrapper ttw in trigs) { if (ttw.state == InternalTriggerState.Waiting) { ttw.state = InternalTriggerState.Blocked; } if (ttw.state == InternalTriggerState.Paused) { ttw.state = InternalTriggerState.PausedAndBlocked; } timeTriggers.Remove(ttw); } blockedJobs.Add(job.Key); }
RAMJobStore中的TriggeredJobComplete方法
当Job执行完后
如果添加了DisallowConcurrentExecution特性
先移除BlockedJobs中Job 再把Trigger添加到TimeTriggers中
if (jd.ConcurrentExecutionDisallowed) { blockedJobs.Remove(jd.Key); IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(jd.Key); foreach (TriggerWrapper ttw in trigs) { if (ttw.state == InternalTriggerState.Blocked) { ttw.state = InternalTriggerState.Waiting; timeTriggers.Add(ttw); } if (ttw.state == InternalTriggerState.PausedAndBlocked) { ttw.state = InternalTriggerState.Paused; } } signaler.SignalSchedulingChange(null, cancellationToken); }
2.PersistJobDataAfterExecution
从字面意思来看也就是执行后保留作业数据
简单演示一下
[PersistJobDataAfterExecution] public class TestPersistJobDataAfterExecutionJob : IJob { public async Task Execute(IJobExecutionContext context) { await Task.Run(() => { var myVaule = context.JobDetail.JobDataMap["myKey"]; Console.WriteLine(myVaule); context.JobDetail.JobDataMap["myKey"] = myVaule + new Random().Next(1,10).ToString(); }); } } public class TestPersistJobDataAfterExcutionScheduler { public async static Task Test() { var scheduler = await StdSchedulerFactory.GetDefaultScheduler(); await scheduler.Start(); var job = JobBuilder.CreateForAsync<TestPersistJobDataAfterExecutionJob>() .WithIdentity("TestPersistJobDataAfterExcutionJob") .UsingJobData("myKey", "myValue") .Build(); var trigger = TriggerBuilder.Create() .WithSimpleSchedule(s => s.WithIntervalInSeconds(1) .RepeatForever()) .Build(); await scheduler.ScheduleJob(job, trigger); } }
未加特性的结果
加特性的结果
原理图
源码解析
QuartzSchedulerThread中的Run方法
JobRunShell shell; try { shell = qsRsrcs.JobRunShellFactory.CreateJobRunShell(bndle); await shell.Initialize(qs, CancellationToken.None).ConfigureAwait(false); } catch (SchedulerException) { await qsRsrcs.JobStore.TriggeredJobComplete(trigger, bndle.JobDetail, SchedulerInstruction.SetAllJobTriggersError, CancellationToken.None).ConfigureAwait(false); continue; } var threadPoolRunResult = qsRsrcs.ThreadPool.RunInThread(() => shell.Run(CancellationToken.None)); if (threadPoolRunResult == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... Log.Error("ThreadPool.RunInThread() returned false!"); await qsRsrcs.JobStore.TriggeredJobComplete(trigger, bndle.JobDetail, SchedulerInstruction.SetAllJobTriggersError, CancellationToken.None).ConfigureAwait(false); }
JobRunShell初始化方法
public virtual async Task Initialize( QuartzScheduler sched, CancellationToken cancellationToken = default) { qs = sched; IJob job; IJobDetail jobDetail = firedTriggerBundle.JobDetail; try { job = sched.JobFactory.NewJob(firedTriggerBundle, scheduler); } catch (SchedulerException se) { await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= ‘{jobDetail.Key}‘", se, cancellationToken).ConfigureAwait(false); throw; } catch (Exception e) { SchedulerException se = new SchedulerException($"Problem instantiating type ‘{jobDetail.JobType.FullName}‘", e); await sched.NotifySchedulerListenersError($"An error occurred instantiating job to be executed. job= ‘{jobDetail.Key}‘", se, cancellationToken).ConfigureAwait(false); throw se; } jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job); }
SimpleJobFactory中的NewJob函数可以看出Job是无状态的直接通过反射创建的
public virtual IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler) { IJobDetail jobDetail = bundle.JobDetail; Type jobType = jobDetail.JobType; try { if (log.IsDebugEnabled()) { log.Debug($"Producing instance of Job ‘{jobDetail.Key}‘, class={jobType.FullName}"); } return ObjectUtils.InstantiateType<IJob>(jobType); } catch (Exception e) { SchedulerException se = new SchedulerException($"Problem instantiating class ‘{jobDetail.JobType.FullName}‘", e); throw se; } }
JobRunShell中Run方法将JobExecutionContextImpl塞给了Execute方法
private JobExecutionContextImpl jec; // Execute the job try { if (log.IsDebugEnabled()) { log.Debug("Calling Execute on job " + jobDetail.Key); } await job.Execute(jec).ConfigureAwait(false); endTime = SystemTime.UtcNow(); }
JobRunShell中Run方法调用的NotifyJobStoreJobComplete函数
await qs.NotifyJobStoreJobComplete(trigger, jobDetail, instCode, cancellationToken).ConfigureAwait(false);
JobRunShell中的NotifyJobStoreJobComplete 可以看出调用了JobStore.TriggeredJobComplete
public virtual Task NotifyJobStoreJobComplete( IOperableTrigger trigger, IJobDetail detail, SchedulerInstruction instCode, CancellationToken cancellationToken = default) { return resources.JobStore.TriggeredJobComplete(trigger, detail, instCode, cancellationToken); }
Quartz.NET中StdScheduleFactory如果未指定JobStore则默认RAMJobStore
从而可以看出RAMJobStore中通过TriggerJobComplete方法来检查是否有PersistJobDataAfterExecution特性
如果有通过MemberwiseClone函数克隆出数据来再通过JobBuilder来构建一个JobDetail
if (jobDetail.PersistJobDataAfterExecution) { JobDataMap newData = jobDetail.JobDataMap; if (newData != null) { newData = (JobDataMap) newData.Clone(); newData.ClearDirtyFlag(); } jd = jd.GetJobBuilder().SetJobData(newData).Build(); jw.JobDetail = jd; } if (jd.ConcurrentExecutionDisallowed) { blockedJobs.Remove(jd.Key); IEnumerable<TriggerWrapper> trigs = GetTriggerWrappersForJob(jd.Key); foreach (TriggerWrapper ttw in trigs) { if (ttw.state == InternalTriggerState.Blocked) { ttw.state = InternalTriggerState.Waiting; timeTriggers.Add(ttw); } if (ttw.state == InternalTriggerState.PausedAndBlocked) { ttw.state = InternalTriggerState.Paused; } } signaler.SignalSchedulingChange(null, cancellationToken); }
最终会通过JobRunShell中的Run方法中的ReturnJob方法 返回Job
qs.RemoveInternalSchedulerListener(this); if (jec != null) { if (jec.JobInstance != null) { qs.JobFactory.ReturnJob(jec.JobInstance); } jec.Dispose(); }
public virtual void ReturnJob(IJob job) { var disposable = job as IDisposable; disposable?.Dispose(); }
Quartz.Net系列(十四):详解Job中两大特性(DisallowConcurrentExecution、PersistJobDataAfterExecution)