源码解析 xxl-job 如何进行稳定性保障

 本文是基于上篇 xxl-job 源码运行解析 进行解析的

1. 调度中心如何进行故障转移(Failover)?

  故障转移就是在调度阶段,如果某一台执行器发生故障支持自动切换到一台正常的执行器机器并且完成调度请求流程

首先在新增一个调度任务的时候,路由策略选择故障转移:

源码解析 xxl-job 如何进行稳定性保障

当源码执行到  XxlJobTrigger 的 processTrigger方法时,如下图:

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

我们可以看到有多种路由策略,故障转移对应的是上图红框中的 FAILOVER。

现在跟着processTrigger方法的源码继续往下走:

源码解析 xxl-job 如何进行稳定性保障

我们可以看到它会根据不同的路由策略返回对应的路由结果,这里我们会走到 ExecutorRouteFailover 的 route 方法:

源码解析 xxl-job 如何进行稳定性保障

在上述代码中会根据  addressList 地址列表做一个心跳通讯,addressList 是 执行器在启动的时候自动向调度中心注册的 ip 地址:源码解析 xxl-job 如何进行稳定性保障

现在我们假设有2个执行器 ip1,ip2.。所以在上述循环中会根据ip1和ip2 做2次心跳检测, 心跳检测 根据上文源码运行解析可以知道  其实就是调度中心发送rpc请求 去执行器中执行  ExecutorBizImpl 的  beat 方法:

源码解析 xxl-job 如何进行稳定性保障

若 任何一个ip地址心跳检测成功则会返回  ReturnT.SUCCESS,然后将当前的 ip 地址返回 ,通过该ip地址会执行正常的调度请求。 若所有注册的ip 地址都心跳检测失败呢,我们继续往下看  processTrigger 方法:

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

当所有注册的ip 地址都心跳检测失败的时候 address 为空,然后返回一个失败结果给 triggerResult ,将对应的执行结果更新到 XxlJobLog中。

 

2. 调度中心执行任务调度失败的结果处理?

源码解析 xxl-job 如何进行稳定性保障

 在调度中心启动的时候会启动一个守护线程去监听任务调度失败的日志

       2.1 失败重试

      我们继续往下看是如何进行失败重试的

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

可以看到它首先会加载所有调度失败的任务日志id,然后遍历,其中若失败重试次数大于0,会去重新进行调度。

重试次数是在新增任务时进行设置的:

源码解析 xxl-job 如何进行稳定性保障重试时我们会看到 log.getExecutorFailRetryCount()-1 这个参数。这是重试时若调度失败,会一直进行重试直到重试次数小于0,这样就完成了调度失败重试的功能

2.2 邮件告警

继续往下看邮件告警的功能

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

可以看到若你配置了邮件的发送者和接受者就能进行邮件告警了,支持多个邮件接受者。发送者的配置如下:

源码解析 xxl-job 如何进行稳定性保障

接受者在新增任务时可以配置:

源码解析 xxl-job 如何进行稳定性保障

邮件告警完成后,会将告警的结果更新到调度日志中,这样就完成了邮件告警的功能

 

3. 执行器执行任务超时或异常如何处理?

       3.1 执行器执行任务超时处理机制

由上文可知,调度任务最后会放进  triggerQueue 触发队列,交给 JobThread 线程处理:

源码解析 xxl-job 如何进行稳定性保障

继续看 JobThread  是如何处理的:

源码解析 xxl-job 如何进行稳定性保障

线程中会从 triggerQueue 取出任务处理,若在新增任务时设置了超时时间大于0:

源码解析 xxl-job 如何进行稳定性保障

则在任务执行的时候会采用 FutureTask 在超时时间内异步获取执行结果,若获取结果超时会抛出TimeoutException

,然后给 executeResult 赋值为一个执行超时的结果,最后中断异步获取执行结果的 futureThread 线程。

3.2 任务执行抛出异常

源码解析 xxl-job 如何进行稳定性保障

上图知道,任务 handler.execute 执行时,若抛出异常会赋值一个失败的结果给 executeResult

3.3 执行结果的回调处理

继续看任务执行完后,finally 中的处理:

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

这里会将执行结果以及相关的参数放进 callBackQueue 回调队列中,回调队列会在执行器启动的时候启动一个守护线程消费处理:

源码解析 xxl-job 如何进行稳定性保障

继续往下看,回调线程是如何处理的:

源码解析 xxl-job 如何进行稳定性保障

这里会从 callBackQueue 回调队列中批量取出任务进行处理,继续往下看:

源码解析 xxl-job 如何进行稳定性保障

这里会去执行  adminBiz.callback 方法,我们看一下  adminBiz的初始化过程:

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

看到上图中的  getObject 方法中我们就懂了(ps : 不懂的,可以去看我的上篇 xxl-job 源码运行解析),这又是一次rpc请求,不同的是这次是 执行器 → 调度器 发送的请求,所以  adminBiz.callback 会去执行 调度中心里面的  AdminBizImpl 的 callback方法 :

源码解析 xxl-job 如何进行稳定性保障

继续往下看 callback  详细的执行过程:

源码解析 xxl-job 如何进行稳定性保障

可以看到若调度任务执行成功且有配置子任务ID则会去执行相应的子任务(ps : 子任务ID 也是在新增任务的时候配置的),若调度任务执行失败(也就是超时或异常)呢,继续往下看:

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

源码解析 xxl-job 如何进行稳定性保障

可以看到若任务执行失败,会将对应的执行结果的失败码,失败信息更新至 xxl_job_log 表中,然后由上文中的失败任务监听器去执行相应的重试或告警。至此,一次任务调度的结果就处理完成

 

总结:

总的来说 xxl-job 就是通过将异常的执行结果放进 xxl_job_log  中,然后 JobFailMonitorHelper启动守护线程通过重试和失败告警的方式去处理异常的调度任务结果

 

上一篇:【Distributed】分布式任务调度平台


下一篇:xxl-job 使用相关