Quartz学习笔记:集群部署&高可用
集群部署
一个Quartz集群中的每个节点是一个独立的Quartz应用,它又管理着其他的节点。这就意味着你必须对每个节点分别启动或停止。Quartz集群中,独立的Quartz节点并不与另一其的节点或是管理节点通信,而是通过同一个数据库表来感知到另一Quartz应用的。
因为Quartz集群依赖于数据库,所以必须首先创建Quartz数据库表,Quartz发布包中包括了所有被支持的数据库平台的SQL脚本。这些SQL脚本存放于<quartz_home>/docs/dbTables 目录下。不同版本,表个数可能不同。
集群配置
创建完表结构之后,我们需要配置Quartz,以让其感知自己是集群的一份子。定义quartz.properties
配置文件默认放在应用classpath路径下,其他路径只能自己手动加载properties。下面是集群配置参考:
#集群中应用采用相同的Scheduler实例
org.quartz.scheduler.instanceName: wenqyScheduler #集群节点的ID必须唯一,可由quartz自动生成
org.quartz.scheduler.instanceId: AUTO #通知Scheduler实例要它参与到一个集群当中
org.quartz.jobStore.isClustered: true #需持久化存储
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #数据源
org.quartz.jobStore.dataSource=myDS #quartz表前缀
org.quartz.jobStore.tablePrefix=QRTZ_ #数据源配置
org.quartz.dataSource.myDS.driver: com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL: jdbc:mysql://localhost:3306/ncdb
org.quartz.dataSource.myDS.user: root
org.quartz.dataSource.myDS.password: 123456
org.quartz.dataSource.myDS.maxConnections: 5
org.quartz.dataSource.myDS.validationQuery: select 0
同一集群下,instanceName必须相同,instanceId可自动生成,isClustered为true,持久化存储,指定数据库类型对应的驱动类和数据源连接。
集群高可用
数据行锁避免重复执行
Quartz究竟是如何保证集群情况下trgger处理的信息同步?
下面跟着源码一步一步分析,QuartzSchedulerThread包含有决定何时下一个Job将被触发的处理循环,主要逻辑在其run()方法中:
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
...... int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { ...... //调度器在trigger队列中寻找30秒内一定数目的trigger(需要保证集群节点的系统时间一致)
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); ...... //触发trigger
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); ...... //释放trigger
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
} }
}
由此可知,QuartzScheduler调度线程不断获取trigger,触发trigger,释放trigger。下面分析trigger的获取过程,qsRsrcs.getJobStore()返回对象是JobStore,集群环境配置如下:
<!-- JobStore 配置 -->
<prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
JobStoreTX继承自JobStoreSupport,而JobStoreSupport的acquireNextTriggers、triggersFired、releaseAcquiredTrigger方法负责具体trigger相关操作,都必须获得TRIGGER_ACCESS锁。核心逻辑在executeInNonManagedTXLock方法中:
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
} //获取锁
transOwner = getLockHandler().obtainLock(conn, lockName);
} if (conn == null) {
conn = getNonManagedTXConnection();
} final T result = txCallback.execute(conn);
try {
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
@Override
public Boolean execute(Connection conn) throws JobPersistenceException {
return txValidator.validate(conn, result);
}
})) {
throw e;
}
} Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
} return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner); //释放锁
} finally {
cleanupConnection(conn);
}
}
}
一个调度器实例在执行涉及到分布式问题的数据库操作前,首先要获取QUARTZ_LOCKS表中对应的行级锁,获取锁后即可执行其他表中的数据库操作,随着操作事务的提交,行级锁被释放,供其他调度实例获取。集群中的每一个调度器实例都遵循这样一种严格的操作规程。
getLockHandler()方法返回的对象类型是Semaphore,获取锁和释放锁的具体逻辑由该对象子类StdRowLockSemaphore 维护。从中很容易看出核心的锁机制是数据库锁。
public class StdRowLockSemaphore extends DBSemaphore {
public static final String SELECT_FOR_LOCK = "SELECT * FROM {0}LOCKS WHERE SCHED_NAME = {1} AND LOCK_NAME = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO {0}LOCKS(SCHED_NAME, LOCK_NAME) VALUES ({1}, ?)";
...
}
关于更多数据库锁的资料还请查看我得另一篇文章:https://www.cnblogs.com/MrSaver/p/11917345.html
故障切换
当集群一个节点在执行一个或多个作业期间失败时发生故障切换(Fail Over
)。当节点出现故障时,其他节点会检测到该状况并识别数据库中在故障节点内正在进行的作业。任何标记为恢复的作业(在JobDetail上都具有“请求恢复(requests recovery
)”属性)将被剩余的节点重新执行,已达到失效任务 转移。没有标记为恢复的作业将在下一次相关的Triggers触发时简单地被释放以执行。
1、每个节点Scheduler实例由集群管理线程ClusterManager周期性(配置文件中检测周期属性clusterCheckinInterval
默认值是 15000 (即15 秒))定时检测CHECKIN数据库,遍历集群各兄弟节点的实例状态,检测集群各个兄弟节点的健康情况。
2、当集群中一个节点的Scheduler实例执行CHECKIN时,它会查看是否有其他节点的Scheduler实例在到达它们所预期的时间还未CHECKIN。若检测到有节点在预期时间未CHECKIN,则认为该节点故障。判断节点是否故障与节点Scheduler实例最后CHECKIN的时间有关,而判断条件:
LAST_CHECKIN_TIME + Max(检测周期,检测节点现在距上次最后CHECKIN的时间) + 7500ms < currentTime。
3、集群管理线程检测到故障节点,就会更新触发器状态,状态更新如下:
4、集群管理线程删除故障节点的实例状态(qrtz_scheduler_state
表),即重置了所有故障节点触发任务一般。原先故障任务和正常任务一样就交由调度处理线程处理了。
负载均衡
负载平衡自动发生,群集的每个节点都尽可能快地触发Jobs。当Triggers的触发时间发生时,获取它的第一个节点(通过在其上放置一个锁定)是将触发它的节点。 哪个节点运行它或多或少是随机的。
集群下任务的调度存在一定的随机性,谁先拥有触发器行锁TRIGGER_ACCESS,谁就先可能触发任务。当某一个机子的调度线程拿到该锁(别的机子只能等待)时:
1、 acquireNextTriggers
获取待触发队列,查询Trigger表的判断条件:
NEXT_FIRE_TIME < now + idleWaitTime + timeWindow and TRIGGER_STATE = 'WAITING
然后更新触发器状态为ACQUIRE。
2、触发待触发队列,修改 Trigger 表中的 NEXT_FIRE_TIME 字段,也就是下次触发时间,计算下次触发时间的方法与具体的触发器实现有关,如Cron表达式触发器,计算触发时间与Cron表达式有关。触发待触发队列后及时释放触发器行锁。
3、这样,别的机子拿到该锁,也查询 Trigger 表,但是由于任务触发器的下次触发时间或者状态已经修改,所以不会被查找出来。这时拿到的任务就可能是别的触发任务。这样就实现了多个节点的应用在某一时刻对任务只进行一次调度。对于重复任务每次都不一定是相同的节点,它或多或少会随机节点运行它。
参考资料
- http://wenqy.com/2018/05/05/quartz%e7%ae%a1%e4%b8%ad%e7%aa%a5%e8%b1%b9%e4%b9%8b%e9%9b%86%e7%be%a4%e9%ab%98%e5%8f%af%e7%94%a8.html
- http://wenqy.com/2018/04/03/quartz%e7%ae%a1%e4%b8%ad%e7%aa%a5%e8%b1%b9%e4%b9%8b%e9%9b%86%e7%be%a4%e7%ae%a1%e7%90%86.html
- https://tech.meituan.com/2014/08/31/mt-crm-quartz.html