[源码分析] 定时任务调度框架 Quartz 之 故障切换
0x00 摘要
之前在 Celery 的故障切换之中[源码解析] 并行分布式框架 Celery 之 容错机制,提到了 Quartz 的故障切换策略,我们就顺便看看 Quartz 如何实现。
大家可以互相印证下,看看这些系统之间的异同和精华所在。
0x01 基础概念
1.1 分布式
考虑分布式,大致可以从两个方面考虑:功能方面与存储方面。
- 从功能方面上看,是集中式管理还是分布式管理?如果是分布式管理,怎么保证节点之间交互协调?
- 从存储方面上看,是集中存储还是分布式存储?如果是分布式存储,怎么可以保证全部加起来提供一个完整的存储镜像?
对于Quartz来说,功能方面是分布式管理,存储方面是集中存储。
1.1.1 功能方面
一个Quartz集群中的每个节点是一个独立的Quartz应用,每个节点都是独立的,彼此之间不交互,从理论上说,它是完全独立的。
但是为了应对集群,这种完全独立其实就意味着完全不独立,即每个节点都需要完成所有管理功能,每个节点都需要管理着其他的节点。于是变成了人人为我,我为人人。
或者说,绝对的*就意味着绝对的不*,看起来是独立的节点,但是其他每个节点都可以管理你。
1.1.2 存储方面
Quartz是采取了集中方式,把所有信息都放在数据库表中,由数据库表统一提供对外的逻辑。
而且,存储也起到了协助管理作用。独立的Quartz节点并不与另一其的节点或是管理节点通信,而是通过相同的数据库表来感知到另一Quartz应用的。我虽然不直接管理你,但是其他所有节点都可以通过数据库来暗自控制你。
1.2 基本概念
需要了解一些Quartz框架的基础概念:
Quartz任务调度的核心元素为:Scheduler——任务调度器、Trigger——触发器、Job——任务。其中trigger和job是任务调度的元数据,scheduler是实际执行调度的控制器。
Trigger 是用于定义调度时间的元素,即按照什么时间规则去执行任务。
Job 用于表示被调度的任务。
Quartz把触发job叫做fire;
Quartz在运行时,会起几类线程,其主要是:一类用于调度job的调度线程(单线程),一类是用于执行job具体业务的工作池;
-
Quartz自带的表里面,有几张表是和触发job直接相关:
- triggers表。triggers表里记录了某个 trigger 的 PREVFIRETIME(上次触发时间),NEXT_FIRETIME(下一次触发时间),TRIGGERSTATE(当前状态);
- locks表。Quartz支持分布式,也就是会存在多个线程同时抢占相同资源的情况,而Quartz正是依赖这张表处理这种状况;
- fired_triggers表。记录正在触发的triggers信息;
TRIGGER_STATE,也就是trigger的状态;
1.3 调度线程
Scheduler调度线程主要有两个:执行常规调度的线程,和执行misfiredtrigger的线程。
常规调度线程轮询存储的所有trigger,如果有需要触发的trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该trigger关联的任务。
Misfire线程是扫描所有的trigger,查看是否有misfiredtrigger,如果有的话根据misfire的策略分别处理(fire now OR wait for the next fire)。
0x02 故障切换
Quartz在集群模式下通过故障切换和任务负载均衡来实现任务的高可用(HA High Available)和伸缩性。
Quartz是基于调度记录表对应调度记录存在的情况下保证高可用。
从本质上来说,集群上每一个节点通过共享同一个数据库来工作(Quartz通过启动两个维护线程来维护数据库状态实现集群管理,一个是检测节点状态线程,一个是恢复任务线程)。
Quartz集群中的多个节点是不会同时工作的,只有一个节点是处于工作状态,其他节点属于待命状态,只有当工作节点挂了,其他节点中的一个才会自动升级为工作节点。
故障切换的发生是在当一个节点正在执行一个或者多个任务失败的时候。当一个节点失败了,其他的节点会检测到并且标 识在失败节点上正在进行的数据库中的任务。
任何被标记为可恢复(任务详细信息的"requests recovery"属性)的任务都会被其他的节点重新执行。没有标记可恢复的任务只会被释放出来,将会在下次相关触发器触发时执行。
因此,我们下面的思考重点就是:
- 如何发现故障节点;
- 如何转移失效任务;
0x03 总体思路
Fail-Over
机制工作在集群环境中,执行recovery工作的线程类叫做ClusterManager
,该线程类同样是在调度器初始化时就开启运行了。
这个线程类在运行期间每15s
进行一次check in
操作,所谓check in,就是在数据库的QRTZ2_SCHEDULER_STATE
表中更新该调度器对应的LAST_CHECKIN_TIME
字段为当前时间,并且查看其他调度器实例的该字段有没有发生停止更新的情况。
当其中一个节点在执行一个或多个作业期间失败时发生故障切换(Fail Over
)。当节点出现故障时,其他节点会检测到该状况并识别数据库中在故障节点内正在进行的作业。
如果检查到有调度器的check in time比当前时间要早约15s(视具体的执行预配置情况而定),那么就判定该调度实例需要recover
,随后会启动该调度器的recovery机制
,获取目标调度器实例正在触发的trigger
,并针对每一个trigger临时添加一个对应的仅执行一次的simpletrigger
。
等到调度流程扫描trigger时,这些trigger会被触发,这样就成功的把这些未完整执行的调度以一种特殊trigger的形式纳入了普通的调度流程中,只要调度流程在正常运行,这些被recover的trigger就会很快被触发并执行。
0x04 如何发现故障节点
对于故障节点的发现,大多都是使用定期心跳来检测。
一般来说,有两种,就是推拉模型。
推:定期心跳,每个节点给管理节点发送心跳;
拉:管理节点定期去每个节点拉取状态信息;
因为Quartz没有管理节点,所以必须采用推模式来模拟心跳。
4.1 数据库表
表 qrtz_scheduler_state
存储集群中node实例信息,quartz会定时读取该表的信息判断集群中每个实例的当前状态。
- instance_name:之前配置文件中org.quartz.scheduler.instanceId配置的名字,就会写入该字段,如果设置为AUTO,quartz会根据物理机名和当前时间产生一个名字;
- last_checkin_time:上次检查时间;
- checkin_interval:检查间隔时间;
具体表如下:
create table qrtz_scheduler_state
(
sched_name varchar(120) not null,
instance_name varchar(200) not null,
last_checkin_time longint not null,
checkin_interval longint not null,
primary key (sched_name,instance_name)
);
4.2 集群管理线程
集群管理线程ClusterManager
是由调度实例StdSchedulerFactory
开始启动调度start()
时创建,也是单独的线程实例。
- 集群管理线程如果是第一次CHECKIN,就看看有没有故障节点,如果发现故障节点就进行处理。
- 此后,集群管理线程休眠到下次检测周期(配置文件
org.quartz.jobStore.clusterCheckinInterval
,默认值是 15000 (即15 秒) )到来,检测CHECKIN数据库,遍历集群各兄弟节点的实例状态,检测集群各个兄弟节点的健康情况。 - 如果存在故障节点,则更新故障节点的触发器状态,并删除故障节点实例状态。这样集群节点间共享触发任务数据就可以进行故障切换,并信号通知调度线程。故障节点的任务的调度就交由调度处理线程处理了。
其缩减版代码如下,可以看出来其将定期做 doCheckin:
class ClusterManager extends Thread {
private volatile boolean shutdown = false;
private int numFails = 0;
private boolean manage() {
boolean res = false;
try {
res = doCheckin(); // 进行 checkin
numFails = 0;
} catch (Exception e) {
if(numFails % 4 == 0) {
getLog().error(
"ClusterManager: Error managing cluster: "
+ e.getMessage(), e);
}
numFails++;
}
return res;
}
@Override
public void run() {
while (!shutdown) {
if (!shutdown) {
long timeToSleep = getClusterCheckinInterval();
long transpiredTime = (System.currentTimeMillis() - lastCheckin);
timeToSleep = timeToSleep - transpiredTime;
if (timeToSleep <= 0) {
timeToSleep = 100L;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
Thread.sleep(timeToSleep);
}
if (!shutdown && this.manage()) { // 定期timer函数
signalSchedulingChangeImmediately(0L);
}
}//while !shutdown
}
}
此时逻辑如下:
+-----------------------------+ +---------------------------------+
| Node A | | Node B |
| | | |
| | | |
| ^ +--> ClusterManager +--> | | ^----> ClusterManager +----> |
| | | | Checkin +----+ Checkin | | | |
| | +---------> | DB | <----------+ | |
| | | | +----+ | | | |
| <-------------------------v | | <----------------------------v |
| timer | | timer |
| | | |
+-----------------------------+ +---------------------------------+
4.2.1 定期 Checkin
此方法是:
- 若不是第一次Checkin,则调用clusterCheckIn查找故障节点;
- 否则在获取到锁之后,再次调用 findFailedInstances 得到failedRecords(因为获取锁之后,情况会有所变化,所以需要再次查找故障节点);
- 若failedRecords大于0,则尝试进行clusterRecover;
其代码如下:
protected boolean doCheckin() throws JobPersistenceException {
boolean transOwner = false;
boolean transStateOwner = false;
boolean recovered = false;
Connection conn = getNonManagedTXConnection();
try {
// Other than the first time, always checkin first to make sure there is
// work to be done before we acquire the lock (since that is expensive,
// and is almost never necessary). This must be done in a separate
// transaction to prevent a deadlock under recovery conditions.
List<SchedulerStateRecord> failedRecords = null;
if (!firstCheckIn) { // 若不是第一次Checkin
failedRecords = clusterCheckIn(conn);
commitConnection(conn);
}
if (firstCheckIn || (failedRecords.size() > 0)) {
getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
transStateOwner = true;
// Now that we own the lock, make sure we still have work to do.
// The first time through, we also need to make sure we update/create our state record
// 否则在获取到锁之后,再次调用 findFailedInstances 得到failedRecords(因为获取锁之后,情况会有所变化,所以需要再次查找故障节点)
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
if (failedRecords.size() > 0) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
transOwner = true;
clusterRecover(conn, failedRecords); // 尝试进行clusterRecover
recovered = true;
}
}
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
}
firstCheckIn = false;
return recovered;
}
4.2.2 侦测失败节点
当集群中一个节点的Scheduler实例执行CHECKIN时,它会查看是否有其他节点的Scheduler实例在到达它们所预期的时间还未CHECKIN,如果一个或多个节点到了预定时间还没有检入,那么运行中的Scheduler就假定它(们) 失败了。然后需获取实例状态访问行锁,进而更新触发器状态,删除故障节点实例状态等等。
查找集群兄弟节点存在故障节点的方法是
org.quartz.impl.jdbcjobstore.JobStoreSupport.findFailedInstances(Connection)
判断节点是否故障与节点Scheduler实例最后CHECKIN的时间有关,而判断条件是:
LAST_CHECKIN_TIME + Max(检测周期,检测节点现在距上次最后CHECKIN的时间) + 7500ms < currentTime。
逻辑是:
通过检查SCHEDULER_STATE表 中 某一条 Scheduler记录在 LAST_CHEDK_TIME列的值是否早于org.quartz.jobStore.clusterCheckinInterval
来确定::
- 读取 qrtz_scheduler_state 表中所有记录;
- 遍历记录,对于某一条记录:
- 若是本身节点且是第一次CheckIn,则放入错误节点列表;
- 若是其他节点且节点Scheduler实例最后CHECKIN的时间距离目前时间大于7500ms,则放入错误节点列表;
- 因为这个 间隔时间,就说明 从 上次checkin 时间 到 本次应该checkin 的时间差大于这个时间间隔,从而说明该列对应的节点没有按时checkin,该节点失效了;
具体代码为:
/**
* Get a list of all scheduler instances in the cluster that may have failed.
* This includes this scheduler if it is checking in for the first time.
*/
protected List<SchedulerStateRecord> findFailedInstances(Connection conn)
throws JobPersistenceException {
try {
List<SchedulerStateRecord> failedInstances = new LinkedList<SchedulerStateRecord>();
boolean foundThisScheduler = false;
long timeNow = System.currentTimeMillis();
// 从数据库读取记录
List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);
for(SchedulerStateRecord rec: states) {
// find own record...
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
foundThisScheduler = true;
if (firstCheckIn) {
failedInstances.add(rec);
}
} else {
// find failed instances...
// 看看是不是过期了
if (calcFailedIfAfter(rec) < timeNow) {
failedInstances.add(rec);
}
}
}
// The first time through, also check for orphaned fired triggers.
if (firstCheckIn) {
failedInstances.addAll(findOrphanedFailedInstances(conn, states));
}
// If not the first time but we didn't find our own instance, then
// Someone must have done recovery for us.
if ((!foundThisScheduler) && (!firstCheckIn)) {
// FUTURE_TODO: revisit when handle self-failed-out impl'ed (see FUTURE_TODO in clusterCheckIn() below)
}
return failedInstances;
}
}
计算时间为:
protected long calcFailedIfAfter(SchedulerStateRecord rec) {
return rec.getCheckinTimestamp() +
Math.max(rec.getCheckinInterval(),
(System.currentTimeMillis() - lastCheckin)) +
7500L;
}
selectSchedulerStateRecords就是从数据库中读取记录:
public List<SchedulerStateRecord> selectSchedulerStateRecords(Connection conn, String theInstanceId)
throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
List<SchedulerStateRecord> lst = new LinkedList<SchedulerStateRecord>();
if (theInstanceId != null) {
ps = conn.prepareStatement(rtp(SELECT_SCHEDULER_STATE));
ps.setString(1, theInstanceId);
} else {
ps = conn.prepareStatement(rtp(SELECT_SCHEDULER_STATES));
}
rs = ps.executeQuery();
while (rs.next()) {
SchedulerStateRecord rec = new SchedulerStateRecord();
rec.setSchedulerInstanceId(rs.getString(COL_INSTANCE_NAME));
rec.setCheckinTimestamp(rs.getLong(COL_LAST_CHECKIN_TIME));
rec.setCheckinInterval(rs.getLong(COL_CHECKIN_INTERVAL));
lst.add(rec);
}
return lst;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}
具体逻辑如下:
+--------------------------------+ +-----------------------------------------------------------+
| Node A | | DB |
| | | qrtz_scheduler_state |
| | | |
| ^ +--> ClusterManager +--v | | +----------------------------------------------------+ |
| | | | selectSchedulerStateRecords | | | |
| | +-------------------------------> | | | |
| | | | | | Node A, LAST_CHECKIN_TIME, CHECKIN_INTERVAL | |
| | | | | | | |
| | | | calcFailedIfAfter | | Node B, LAST_CHECKIN_TIME, CHECKIN_INTERVAL | |
| | | <----------------------------+ | | | |
| <----------------------- v | | | ...... | |
| timer | | | | |
| | | | Node Z, LAST_CHECKIN_TIME, CHECKIN_INTERVAL | |
+--------------------------------+ | | | |
| +----------------------------------------------------+ |
| |
+-----------------------------------------------------------+
手机如下:
0x05 转移失效任务
下面我们讲讲从故障实例中恢复Job。
当一个Sheduler实例在执行某个Job时失败了,有可能由另一正常工作的Scheduler实例接过这个Job重新运行。
5.1 请求恢复
要实现这种行为,配置给JobDetail对象的Job“请求恢复(requests recovery
)”属性必须设置为true(job.setRequestsRecovery(true))。
- 如果可恢复属性被设置为false,当某个Scheduler在运行该job失败时,它将不会重新运行;而是由另一个Scheduler实例在下一次相关的Triggers触发时简单地被释放以执行。
- 任何标记为恢复true的作业将被剩余的节点重新执行,从而 达到失效任务 转移的目的。
5.2 更新触发器状态
集群管理线程检测到故障节点,就会更新触发器状态,org.quartz.impl.jdbcjobstore.Constants
常量类定义了触发器的几种状态。
故障节点状态更新规则如下。
故障节点触发器更新前状态 | 更新后状态 |
---|---|
BLOCKED | WAITING |
PAUSED_BLOCKED | PAUSED |
ACQUIRED | WAITING |
COMPLETE | 无,删除Trigger |
集群管理线程 在数据库中(qrtz_scheduler_state
表)删除了 故障节点的实例状态,即重置了所有故障节点触发的任务。原先故障任务和正常任务一样就交由调度处理线程处理了。
5.3 恢复任务
任务恢复 具体由clusterRecover方法完成。
- 遍历每一个失效节点,对于每一个节点:
- 得到此节点已经得到的任务,遍历每一个任务
- 对于 blocked triggers,则release,修改其状态;
- release acquired triggers,修改其状态;
- 如果需要恢复任务,则进行处理,具体就是添加一个新的trigger:
- 设置其job各种信息;
- 设置其下一次运行时间
- 插入到数据库;
- 若此任务不允许并发执行,相应修改其状态;
- 得到此节点已经得到的任务,遍历每一个任务
具体代码如下:
@SuppressWarnings("ConstantConditions")
protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances) {
if (failedInstances.size() > 0) {
long recoverIds = System.currentTimeMillis();
try {
// 遍历每一个失效节点
for (SchedulerStateRecord rec : failedInstances) {
List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
.selectInstancesFiredTriggerRecords(conn,
rec.getSchedulerInstanceId());
Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
// 对于失效节点已经得到的任务,遍历每一个任务
for (FiredTriggerRecord ftRec : firedTriggerRecs) {
TriggerKey tKey = ftRec.getTriggerKey();
JobKey jKey = ftRec.getJobKey();
triggerKeys.add(tKey);
// 对于 blocked triggers,则release,修改其状态
// release blocked triggers..
if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
} else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
// release acquired triggers,修改其状态
// release acquired triggers..
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
getDelegate().updateTriggerStateFromOtherState(
conn, tKey, STATE_WAITING,
STATE_ACQUIRED);
acquiredCount++;
} else if (ftRec.isJobRequestsRecovery()) {
// 如果需要恢复任务,则进行处理
// handle jobs marked for recovery that were not fully
// executed..
if (jobExists(conn, jKey)) {
@SuppressWarnings("deprecation")
SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
"recover_"
+ rec.getSchedulerInstanceId()
+ "_"
+ String.valueOf(recoverIds++),
Scheduler.DEFAULT_RECOVERY_GROUP,
new Date(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobName(jKey.getName());
rcvryTrig.setJobGroup(jKey.getGroup());
rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
rcvryTrig.setPriority(ftRec.getPriority());
JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobDataMap(jd);
rcvryTrig.computeFirstFireTime(null);
storeTrigger(conn, rcvryTrig, null, false,
STATE_WAITING, false, true);
recoveredCount++;
} else {
otherCount++;
}
} else {
otherCount++;
}
// free up stateful job's triggers
......
}
......
}
}
}
具体计算恢复节点的下一次触发时间代码如下:
/**
* <p>
* Called by the scheduler at the time a <code>Trigger</code> is first
* added to the scheduler, in order to have the <code>Trigger</code>
* compute its first fire time, based on any associated calendar.
* </p>
*
* <p>
* After this method has been called, <code>getNextFireTime()</code>
* should return a valid answer.
* </p>
*
* @return the first time at which the <code>Trigger</code> will be fired
* by the scheduler, which is also the same value <code>getNextFireTime()</code>
* will return (until after the first firing of the <code>Trigger</code>).
* </p>
*/
@Override
public Date computeFirstFireTime(Calendar calendar) {
nextFireTime = getStartTime();
while (nextFireTime != null && calendar != null
&& !calendar.isTimeIncluded(nextFireTime.getTime())) {
nextFireTime = getFireTimeAfter(nextFireTime);
if(nextFireTime == null)
break;
//avoid infinite loop
java.util.Calendar c = java.util.Calendar.getInstance();
c.setTime(nextFireTime);
if (c.get(java.util.Calendar.YEAR) > YEAR_TO_GIVEUP_SCHEDULING_AT) {
return null;
}
}
return nextFireTime;
}
至此,quartz 的故障切换分析完毕。
0xEE 个人信息
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。