一. xxl-job 简介
- XXL-JOB 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用
- 文档地址:https://www.xuxueli.com/xxl-job/
- git 地址:https://github.com/xuxueli/xxl-job
二. xxl-job 的架构体系
三. 调度中心
- 本篇使用的源码是:
2.3.0-SNAPSHOT
版本 - 调度中心源码分析入口是在
com.xxl.job.admin.core.conf.XxlJobAdminConfig
中,可以看到这个类实现了InitializingBean, DisposableBean
接口,这两个接口是 spring 的 bean 初始化完成和销毁是会触发afterPropertiesSet
和destroy
方法,这里不了解的可以自行百度哈 - 首先来看 bean 初始化完成,也就是调度中心启动的流程
- 进入
com.xxl.job.admin.core.scheduler.XxlJobScheduler#init
方法中,按顺序跟进代码
可以看到这个方法初始化了调度中心的所有流程,具体可以看上图的注释 (有不对的地方欢迎指正),对于initI18n()
方法就不看了,就是国际化的正常操作,接着看下一个方法 - 进入
JobTriggerPoolHelper.toStart()
,这个方法看代码其实就是初始化了两个线程池fastTriggerPool
和slowTriggerPool
,这两个线程池顾名思义就是 快 / 慢 线程池,其中两个线程池的最大线程数可配置,这两个线程池在执行线程调度的时候会进行选择,后面会讲到,这里了解这一步就是初始化了快慢线程池就行 - 接着进入到
JobRegistryHelper.getInstance().start()
里面,这一步初始化了执行器的注册、移除的线程池registryOrRemoveThreadPool
,这个线程池是在执行器相关的操作中处理注册执行的添加和移除的,后面讲注册器部分的时候在讲;在这一步中同时启动了一个线程registryMonitorThread
,这个线程作用是维护自动注册的执行的,由于篇幅较长,直接贴代码不截图了,根据代码大家可以自行理解逻辑
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// auto registry group
// 获取所有自动注册类型的 执行器
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
// 获取实时维护的执行器注册表,维护在线的执行器和调度中心机器地址信息 这里是获取超时的执行器(已死亡的->超过90秒未变更数据的)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
//如果有已死亡的执行器 直接从实时维护的表中删除掉
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<>();
//查询所有活跃的执行器注册表,就是update_time 90秒内有更新的
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
//执行器的名称 其实就是执行器端配置的xxl.job.executor.appname
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
//就是把同一个executor.appname的多个执行器 根据appname分组维护到这个map中
//一个executor.appname可能有多个地址的执行器 就是一个项目分布式的部署 但是executor.appname相同
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group: groupList) {
//遍历所有自动注册类型的 执行器
//从上面 筛选的所有活跃的执行器Map中获取最新的执行器地址+端口号
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
//如果不是空的List 那么给这些执行器地址排个序
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
//按照逗号分割
for (String item:registryList) {
addressListSB.append(item).append(",");
}
//变成一个逗号分割排好序的地址列表
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
//更新到XxlJobGroup表中
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
//每30秒执行一次 上面的逻辑 进行自动注册的执行器地址更新
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
- 接着进入
JobFailMonitorHelper.getInstance().start();
这里初始化了一个线程monitorThread
,这个线程主要用来处理执行失败的任务重试和告警 - 接着进入
JobCompleteHelper.getInstance().start()
,这一步初始化了回调线程池callbackThreadPool
,用来处理执行器任务执行后的回调,同时初始化了monitorThread
线程,用来检测回调超时的任务并处理 - 接着进入
JobLogReportHelper.getInstance().start()
,这一步启动了logrThread
线程 对调度日志进行统计和清理,主要是统计xxl_job_log 表
3 天内的成功、运行中、失败的调度日志 并记录到xxl_job_log_report 表
中,同时根据对配置时间logretentiondays
的日志进行清理,主要作用就是维护日志,比较简单大家自己看看源码,我就不贴出来了 - 接下来进入重点:
JobScheduleHelper.getInstance().start()
,这里才是真正的对任务进行调度触发的地方,这里需要大家多看看代码,理解每一步的含义,我把我对这个流程的理解都在注释里面写出来了,有不对的欢饮大家指正,进入方法看:
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
//5秒后的整秒 当前毫秒%1000 是取余得到下次整秒的事件距离
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// preReadCount = (200 + 100) * 20 预读条数
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
//先获取到数据库连接
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
//设置不自动提交事务
conn.setAutoCommit(false);
//for update mysql锁 保证同时只有一个在执行
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
// 扫出来trigger_status(调度状态) = 1(运行) and trigger_next_time(下次调度时间) <= now + 5秒的任务
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring 遍历查询的任务
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump 若果当前时间 > 该任务下次执行时间 + 5秒 那说明这个任务调度超时了 当前时间超过了本该执行时间+5秒
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
//调度超时会查出来这个任务 因为查询条件查询的:trigger_next_time(下次调度时间) <= now + 5秒的任务
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match 如果查到了这个调度超时的任务 那么获取 调度过期策略 并进行调度处理
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
// 如果这个任务的超时处理策略是立即执行一次:FIRE_ONCE_NOW(立即执行一次),那么立即执行一次这个任务
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next 刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
//如果当前时间大于下次执行时间
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger 立即触发一次任务
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next 刷新下次执行时间
// 注意这里是在这个任务被查出来后第一次刷新下次时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
// 如果任务运行状态是:运行状态 并且 当前时间+5秒 > 下次执行(这个下次执行时间jobInfo.getTriggerNextTime() 是第一次被刷新后的时间)
// 那么把这个任务加入到时间轮中,留待时间轮线程消费
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second 获取这个任务下次的执行时间的整秒
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring 把算出来的整秒放进时间轮里面 每秒一个时间轮bucket
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next 刷新下次调度时间
// 在这个任务被加入时间轮前 这个任务的jobInfo.getTriggerNextTime()下次执行时间就已经是被刷新了的,也就是下一次的时间
// 这里再次刷新一次,这个下次执行时间(本轮调度后的第二次执行时间)会被记录到数据库
// 这样时间轮里面记录的是本轮调度后的下一次调度时间,更新到数据库的是时间轮调度后的下一次调度时间
// 就算下次循环数据库扫描出来这个任务,那也是时间轮调度后的下一次 不会存在重复执行的可能
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 如果不是以上两种情况,那这个任务应该是没到时间的 加入时间轮等待执行
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info 将执行完成的任务状态和下次时间等更新到数据库中
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
//如果本次没有扫到可以执行的任务 把preReadSuc改为false 留待后面的sleep使用
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
//提交锁
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
//恢复原来的提交机制
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
//关闭连接
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
//得到本次循环内调度任务耗费的时间
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
// 如果本次耗费时间小于1秒
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
// 如果preReadSuc为true则表示本次有任务调度,sleep 到下一次整秒
// 如果preReadSuc为false表示本次没有任务调度,sleep 到第5个整秒
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
//处理时间轮中存放的定时任务
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
// 开始while循环的秒
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
/**
* 对于这个线程的工作流程 可以查看下面这个栗子 对应 ringThread run方法中的注释,可以很清楚的看到每次循环取出的是哪几秒的数据
* 其实就是取当前秒的和上一秒的
*
* 栗子:(把这段代码逻辑拷出去输出一下秒数就知道了)
* 开始while循环的秒:27
* 每次循环开始---------------------------
* 当前的秒 === 27
* 取出的秒 ===27
* 取出的秒 ===26
* 每次循环开始---------------------------
* 当前的秒 === 28
* 取出的秒 ===28
* 取出的秒 ===27
* 每次循环开始---------------------------
* 当前的秒 === 29
* 取出的秒 ===29
* 取出的秒 ===28
* 每次循环开始---------------------------
* 当前的秒 === 30
* 取出的秒 ===30
* 取出的秒 ===29
*/
while (!ringThreadToStop) {
//每次循环开始---------------------------
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
// 获取当前的秒数
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 当前的秒 ===
//这个for循环取出的是在时间轮中当前秒的和前一秒的
//就是怕取出任务后调度的时间比较长 所以往前多取一秒 保证时间轮每个bucket都被处理
for (int i = 0; i < 2; i++) {
//取出的秒 ===
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// next second, align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
四. 结尾
本篇主要靠大家看注释理解代码,我懒得画图去解释调度中心的调度流程,多看两遍代码就理解了,后面有空再完善图解流程和执行器部分以及调度 RPC 相关的源码解析,先写到这