上面初始化的7个步骤拆分如下============== 1 国际化相关 private void initI18n (){ // 根据环境设置title为中文、英文等 for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum. values ()) { item.setTitle(I18nUtil. getString ( "jobconf_block_" .concat(item.name()))) ; } } 2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool 这个步骤初始化了两个线程池 fastTriggerPool和 slowTriggerPool 在触发调度的时候会有一个选择快慢线程池的过程,如果job在一分钟内超过超过10次,就用slowTriggerPool来处理,如下: ThreadPoolExecutor triggerPool_ = fastTriggerPool ; AtomicInteger jobTimeoutCount = jobTimeoutCountMap .get(jobId) ; if (jobTimeoutCount!= null && jobTimeoutCount.get() > 10 ) { // job 在一分钟内超过超过 10 次,就用 slowTriggerPool 来处理 job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool ; } triggerPool_.execute( new Runnable() {.........省略............} 3 启动注册监听线程 3.1 初始化registryOrRemoveThreadPool线程池:用于 注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 3.2 启动监听注册的线程registryMonitorThread: 清除心跳超过90s的注册信息,并且刷新分组注册信息 public void start(){ // 用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 for registry or remove registryOrRemoveThreadPool = new ThreadPoolExecutor( 2, 10, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>( 2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now)."); } }); // 启动监听注册的线程 for monitor registryMonitorThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { // 获取自动注册的执行器组(执行器地址类型:0=自动注册、1=手动录入) auto registry group List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) {// group组集合不为空 // 移除死掉的调用地址(心跳时间超过90秒,就当线程挂掉了。默认是30s做一次心跳) remove dead address (admin/executor) List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { // 移除挂掉的注册地址信息 XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); } // fresh online address (admin/executor) HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>(); // 找出所有正常没死掉的注册地址 List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { // 确保是 EXECUTOR 执行器类型 if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List<String> registryList = appAddressMap.get(appname); if (registryList == null) { registryList = new ArrayList<String>(); } if (!registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } appAddressMap.put(appname, registryList); } } } // 刷新分组注册地址信息 fresh group address for (XxlJobGroup group: groupList) { List<String> registryList = appAddressMap.get(group.getAppname()); String addressListStr = null; if (registryList!=null && !registryList.isEmpty()) { Collections.sort(registryList); StringBuilder addressListSB = new StringBuilder(); for (String item:registryList) { addressListSB.append(item).append(","); } addressListStr = addressListSB.toString(); addressListStr = addressListStr.substring(0, addressListStr.length()-1); } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } } logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); } }); registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); } 4 启动失败任务监听线程(重试、告警) 这部分逻辑比较简单,就是重试 + 告警,核心代码如下 // 获取执行失败的job信息 List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000); if (failLogIds!=null && !failLogIds.isEmpty()) { for (long failLogId: failLogIds) { // lock log int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; } XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、失败塞回重试 fail retry monitor if (log.getExecutorFailRetryCount() > 0) { JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); String retryMsg = "<span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); } // 2、进行失败告警 fail alarm monitor int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log); newAlarmStatus = alarmResult?2:3; } else { newAlarmStatus = 1; } XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); } } 5 启动监控线程 5.1 初始化callbackThreadPool线程池:用于callback 回调的线程池,客户端调用api/callback接口时会使用这个线程池 5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态 超过10min,且对应执行器心跳注册 失败不在线,则将本地调度主动标记失败 逻辑较简单,如上两点 6 启动日志统计和清除线程logrThread -- 日志记录刷新,刷新 最近三天的日志Report(即统计每天的失败、成功、运行次数等) -- 每天清除一次 失效过期的日志数据 配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除 逻辑较简单,如上两点 7 启动任务调度 7.1 scheduleThread-取待执行任务数据入时间轮 -- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行 -- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据 ---- preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; -- 第三步:将当前时间与下次调度时间对比,有如下三种情况 **** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理 -------- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次 -------- 2、刷新上一次触发 和 下一次待触发时间 **** 当前时间 大于 任务的下一次触发时间 并且是没有过期的: -------- 1、直接触发任务执行器 -------- 2、刷新上一次触发 和 下一次待触发时间 -------- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度 ---------------- 1、求当前任务下一次触发时间所处一分钟的第N秒 ---------------- 2、将当前任务ID和ringSecond放进时间轮里面 ---------------- 3、刷新上一次触发 和 下一次待触发时间 **** 当前时间 小于 下一次触发时间: -------- 1、求当前任务下一次触发时间所处一分钟的第N秒 -------- 2、将当前任务ID和ringSecond放进时间轮里面 -------- 3、刷新上一次触发 和 下一次待触发时间 -- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time -- 第五步:提交数据库事务,释放数据库select for update排它锁 7.2 ringThread-根据时间轮执行job任务 首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>() -- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了 -- 第二步:执行触发器 -- 第三步:清除当前刻度列表的数据 **** 执行的过程中还会选择对应的策略,如下: -------- 阻塞策略:串行、废弃后面、覆盖前面 -------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询 启动两个线程解析的核心源码如下: public void start (){ // 启动调度线程,这些线程是用来取数据的 schedule thread scheduleThread = new Thread( new Runnable() { @Override public void run () { try { // 不知道为啥要休眠 4-5 秒 时间,然后再启动 TimeUnit. MILLISECONDS .sleep( 5000 - System. currentTimeMillis ()% 1000 ) ; } catch (InterruptedException e) { if (! scheduleThreadToStop ) { logger .error(e.getMessage() , e) ; } } logger .info( ">>>>>>>>> init xxl-job admin scheduler success." ) ; // 这里是预读数量 pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig. getAdminConfig ().getTriggerPoolFastMax() + XxlJobAdminConfig. getAdminConfig ().getTriggerPoolSlowMax()) * 20 ; while (! scheduleThreadToStop ) { // 扫描任务 Scan Job long start = System. currentTimeMillis () ; Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null boolean preReadSuc = true; try { conn = XxlJobAdminConfig. getAdminConfig ().getDataSource().getConnection() ; connAutoCommit = conn.getAutoCommit() ; conn.setAutoCommit( false ) ; // 采用 select for update ,是排它锁。说白了 xxl-job 用一张数据库表来当分布式锁了,确保多个 xxl-job admin 节点下,依旧只能同时执行一个调度线程任务 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ) ; preparedStatement.execute() ; // tx start // 1 、预读数据 pre read long nowTime = System. currentTimeMillis () ; // -- 从数据库中读取截止到五秒后未执行的 job ,并且读取 preReadCount=6000 条 List<XxlJobInfo> scheduleList = XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS , preReadCount) ; if (scheduleList!= null && scheduleList.size()> 0 ) { // 2 、 push 压进 时间轮 push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS ) { // 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS ( 5s )) , 可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理 // 2.1 、 trigger-expire > 5s : pass && make next-trigger-time logger .warn( ">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()) ; // 1 、匹配过期失效的策略: DO_NOTHING= 过期啥也不干,废弃; FIRE_ONCE_NOW= 过期立即触发一次 misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum. match (jobInfo.getMisfireStrategy() , MisfireStrategyEnum. DO_NOTHING ) ; if (MisfireStrategyEnum. FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. MISFIRE , - 1 , null, null, null ) ; logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ; } // 2 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date()) ; } else if (nowTime > jobInfo.getTriggerNextTime()) { // 当前时间 大于 任务的下一次触发时间 并且是没有过期的 // 2.2 、 trigger-expire < 5s : direct-trigger && make next-trigger-time // 1 、直接触发任务执行器 trigger JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. CRON , - 1 , null, null, null ) ; logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ; // 2 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date()) ; // 如果下一次触发在五秒内,直接放进时间轮里面待调度 next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()== 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ; // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring pushTimeRing(ringSecond , jobInfo.getId()) ; // 3 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ; } } else { // 当前时间 小于 下一次触发时间 // 2.3 、 trigger-pre-read : time-ring trigger && make next-trigger-time // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ; // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring pushTimeRing(ringSecond , jobInfo.getId()) ; // 3 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ; } } // 3 、更新数据库执行器信息,如 trigger_last_time 、 trigger_next_time update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleUpdate(jobInfo) ; } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (! scheduleThreadToStop ) { logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}" , e) ; } } finally { // 提交事务,释放数据库 select for update 的锁 commit .......................省略............. } long cost = System. currentTimeMillis ()-start ; // 如果执行太快了,就稍微 sleep 等待一下 Wait seconds, align second if (cost < 1000 ) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit. MILLISECONDS .sleep((preReadSuc? 1000 : PRE_READ_MS ) - System. currentTimeMillis ()% 1000 ) ; } catch (InterruptedException e) { if (! scheduleThreadToStop ) { logger .error(e.getMessage() , e) ; } } }) ; scheduleThread .setDaemon( true ) ; scheduleThread .setName( "xxl-job, admin JobScheduleHelper#scheduleThread" ) ; scheduleThread .start() ; // 时间轮线程,用于取出每秒的数据,然后处理 ring thread ringThread = new Thread( new Runnable() { @Override public void run () { while (! ringThreadToStop ) { // align second try { TimeUnit. MILLISECONDS .sleep( 1000 - System. currentTimeMillis () % 1000 ) ; } catch (InterruptedException e) { if (! ringThreadToStop ) { logger .error(e.getMessage() , e) ; } } try { // second data List<Integer> ringItemData = new ArrayList<>() ; // 获取当前所处的一分钟第几秒,然后 for 两次,第二次是为了重跑前面一个刻度没有被执行的的 job list ,避免前面的刻度遗漏了 int nowSecond = Calendar. getInstance ().get(Calendar. SECOND ) ; // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for ( int i = 0 ; i < 2 ; i++) { List<Integer> tmpData = ringData .remove( (nowSecond+ 60 -i)% 60 ) ; if (tmpData != null ) { ringItemData.addAll(tmpData) ; } } // ring trigger logger .debug( ">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays. asList (ringItemData) ) ; if (ringItemData.size() > 0 ) { // do trigger for ( int jobId: ringItemData) { // 执行触发器 do trigger JobTriggerPoolHelper. trigger (jobId , TriggerTypeEnum. CRON , - 1 , null, null, null ) ; } // 清除当前刻度列表的数据 clear ringItemData.clear() ; } } catch (Exception e) { if (! ringThreadToStop ) { logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}" , e) ; } } } logger .info( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop" ) ; } }) ; ringThread .setDaemon( true ) ; ringThread .setName( "xxl-job, admin JobScheduleHelper#ringThread" ) ; ringThread .start() ; }
Xxl-Job调度器原理解析
上面初始化的7个步骤拆分如下============== 1 国际化相关 private void initI18n (){ // 根据环境设置title为中文、英文等 for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum. values ()) { item.setTitle(I18nUtil. getString ( "jobconf_block_" .concat(item.name()))) ; } } 2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool 这个步骤初始化了两个线程池 fastTriggerPool和 slowTriggerPool 在触发调度的时候会有一个选择快慢线程池的过程,如果job在一分钟内超过超过10次,就用slowTriggerPool来处理,如下: ThreadPoolExecutor triggerPool_ = fastTriggerPool ; AtomicInteger jobTimeoutCount = jobTimeoutCountMap .get(jobId) ; if (jobTimeoutCount!= null && jobTimeoutCount.get() > 10 ) { // job 在一分钟内超过超过 10 次,就用 slowTriggerPool 来处理 job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool ; } triggerPool_.execute( new Runnable() {.........省略............} 3 启动注册监听线程 3.1 初始化registryOrRemoveThreadPool线程池:用于 注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 3.2 启动监听注册的线程registryMonitorThread: 清除心跳超过90s的注册信息,并且刷新分组注册信息 public void start(){ // 用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 for registry or remove registryOrRemoveThreadPool = new ThreadPoolExecutor( 2, 10, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>( 2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now)."); } }); // 启动监听注册的线程 for monitor registryMonitorThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { // 获取自动注册的执行器组(执行器地址类型:0=自动注册、1=手动录入) auto registry group List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) {// group组集合不为空 // 移除死掉的调用地址(心跳时间超过90秒,就当线程挂掉了。默认是30s做一次心跳) remove dead address (admin/executor) List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { // 移除挂掉的注册地址信息 XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); } // fresh online address (admin/executor) HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>(); // 找出所有正常没死掉的注册地址 List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { // 确保是 EXECUTOR 执行器类型 if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List<String> registryList = appAddressMap.get(appname); if (registryList == null) { registryList = new ArrayList<String>(); } if (!registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } appAddressMap.put(appname, registryList); } } } // 刷新分组注册地址信息 fresh group address for (XxlJobGroup group: groupList) { List<String> registryList = appAddressMap.get(group.getAppname()); String addressListStr = null; if (registryList!=null && !registryList.isEmpty()) { Collections.sort(registryList); StringBuilder addressListSB = new StringBuilder(); for (String item:registryList) { addressListSB.append(item).append(","); } addressListStr = addressListSB.toString(); addressListStr = addressListStr.substring(0, addressListStr.length()-1); } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } } logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); } }); registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); } 4 启动失败任务监听线程(重试、告警) 这部分逻辑比较简单,就是重试 + 告警,核心代码如下 // 获取执行失败的job信息 List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000); if (failLogIds!=null && !failLogIds.isEmpty()) { for (long failLogId: failLogIds) { // lock log int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; } XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、失败塞回重试 fail retry monitor if (log.getExecutorFailRetryCount() > 0) { JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); String retryMsg = "<span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); } // 2、进行失败告警 fail alarm monitor int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log); newAlarmStatus = alarmResult?2:3; } else { newAlarmStatus = 1; } XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); } } 5 启动监控线程 5.1 初始化callbackThreadPool线程池:用于callback 回调的线程池,客户端调用api/callback接口时会使用这个线程池 5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态 超过10min,且对应执行器心跳注册 失败不在线,则将本地调度主动标记失败 逻辑较简单,如上两点 6 启动日志统计和清除线程logrThread -- 日志记录刷新,刷新 最近三天的日志Report(即统计每天的失败、成功、运行次数等) -- 每天清除一次 失效过期的日志数据 配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除 逻辑较简单,如上两点 7 启动任务调度 7.1 scheduleThread-取待执行任务数据入时间轮 -- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行 -- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据 ---- preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; -- 第三步:将当前时间与下次调度时间对比,有如下三种情况 **** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理 -------- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次 -------- 2、刷新上一次触发 和 下一次待触发时间 **** 当前时间 大于 任务的下一次触发时间 并且是没有过期的: -------- 1、直接触发任务执行器 -------- 2、刷新上一次触发 和 下一次待触发时间 -------- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度 ---------------- 1、求当前任务下一次触发时间所处一分钟的第N秒 ---------------- 2、将当前任务ID和ringSecond放进时间轮里面 ---------------- 3、刷新上一次触发 和 下一次待触发时间 **** 当前时间 小于 下一次触发时间: -------- 1、求当前任务下一次触发时间所处一分钟的第N秒 -------- 2、将当前任务ID和ringSecond放进时间轮里面 -------- 3、刷新上一次触发 和 下一次待触发时间 -- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time -- 第五步:提交数据库事务,释放数据库select for update排它锁 7.2 ringThread-根据时间轮执行job任务 首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>() -- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了 -- 第二步:执行触发器 -- 第三步:清除当前刻度列表的数据 **** 执行的过程中还会选择对应的策略,如下: -------- 阻塞策略:串行、废弃后面、覆盖前面 -------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询 启动两个线程解析的核心源码如下: public void start (){ // 启动调度线程,这些线程是用来取数据的 schedule thread scheduleThread = new Thread( new Runnable() { @Override public void run () { try { // 不知道为啥要休眠 4-5 秒 时间,然后再启动 TimeUnit. MILLISECONDS .sleep( 5000 - System. currentTimeMillis ()% 1000 ) ; } catch (InterruptedException e) { if (! scheduleThreadToStop ) { logger .error(e.getMessage() , e) ; } } logger .info( ">>>>>>>>> init xxl-job admin scheduler success." ) ; // 这里是预读数量 pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig. getAdminConfig ().getTriggerPoolFastMax() + XxlJobAdminConfig. getAdminConfig ().getTriggerPoolSlowMax()) * 20 ; while (! scheduleThreadToStop ) { // 扫描任务 Scan Job long start = System. currentTimeMillis () ; Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null boolean preReadSuc = true; try { conn = XxlJobAdminConfig. getAdminConfig ().getDataSource().getConnection() ; connAutoCommit = conn.getAutoCommit() ; conn.setAutoCommit( false ) ; // 采用 select for update ,是排它锁。说白了 xxl-job 用一张数据库表来当分布式锁了,确保多个 xxl-job admin 节点下,依旧只能同时执行一个调度线程任务 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ) ; preparedStatement.execute() ; // tx start // 1 、预读数据 pre read long nowTime = System. currentTimeMillis () ; // -- 从数据库中读取截止到五秒后未执行的 job ,并且读取 preReadCount=6000 条 List<XxlJobInfo> scheduleList = XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS , preReadCount) ; if (scheduleList!= null && scheduleList.size()> 0 ) { // 2 、 push 压进 时间轮 push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS ) { // 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS ( 5s )) , 可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理 // 2.1 、 trigger-expire > 5s : pass && make next-trigger-time logger .warn( ">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()) ; // 1 、匹配过期失效的策略: DO_NOTHING= 过期啥也不干,废弃; FIRE_ONCE_NOW= 过期立即触发一次 misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum. match (jobInfo.getMisfireStrategy() , MisfireStrategyEnum. DO_NOTHING ) ; if (MisfireStrategyEnum. FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. MISFIRE , - 1 , null, null, null ) ; logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ; } // 2 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date()) ; } else if (nowTime > jobInfo.getTriggerNextTime()) { // 当前时间 大于 任务的下一次触发时间 并且是没有过期的 // 2.2 、 trigger-expire < 5s : direct-trigger && make next-trigger-time // 1 、直接触发任务执行器 trigger JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. CRON , - 1 , null, null, null ) ; logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ; // 2 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date()) ; // 如果下一次触发在五秒内,直接放进时间轮里面待调度 next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()== 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ; // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring pushTimeRing(ringSecond , jobInfo.getId()) ; // 3 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ; } } else { // 当前时间 小于 下一次触发时间 // 2.3 、 trigger-pre-read : time-ring trigger && make next-trigger-time // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ; // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring pushTimeRing(ringSecond , jobInfo.getId()) ; // 3 、刷新上一次触发 和 下一次待触发时间 fresh next refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ; } } // 3 、更新数据库执行器信息,如 trigger_last_time 、 trigger_next_time update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleUpdate(jobInfo) ; } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (! scheduleThreadToStop ) { logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}" , e) ; } } finally { // 提交事务,释放数据库 select for update 的锁 commit .......................省略............. } long cost = System. currentTimeMillis ()-start ; // 如果执行太快了,就稍微 sleep 等待一下 Wait seconds, align second if (cost < 1000 ) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit. MILLISECONDS .sleep((preReadSuc? 1000 : PRE_READ_MS ) - System. currentTimeMillis ()% 1000 ) ; } catch (InterruptedException e) { if (! scheduleThreadToStop ) { logger .error(e.getMessage() , e) ; } } }) ; scheduleThread .setDaemon( true ) ; scheduleThread .setName( "xxl-job, admin JobScheduleHelper#scheduleThread" ) ; scheduleThread .start() ; // 时间轮线程,用于取出每秒的数据,然后处理 ring thread ringThread = new Thread( new Runnable() { @Override public void run () { while (! ringThreadToStop ) { // align second try { TimeUnit. MILLISECONDS .sleep( 1000 - System. currentTimeMillis () % 1000 ) ; } catch (InterruptedException e) { if (! ringThreadToStop ) { logger .error(e.getMessage() , e) ; } } try { // second data List<Integer> ringItemData = new ArrayList<>() ; // 获取当前所处的一分钟第几秒,然后 for 两次,第二次是为了重跑前面一个刻度没有被执行的的 job list ,避免前面的刻度遗漏了 int nowSecond = Calendar. getInstance ().get(Calendar. SECOND ) ; // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for ( int i = 0 ; i < 2 ; i++) { List<Integer> tmpData = ringData .remove( (nowSecond+ 60 -i)% 60 ) ; if (tmpData != null ) { ringItemData.addAll(tmpData) ; } } // ring trigger logger .debug( ">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays. asList (ringItemData) ) ; if (ringItemData.size() > 0 ) { // do trigger for ( int jobId: ringItemData) { // 执行触发器 do trigger JobTriggerPoolHelper. trigger (jobId , TriggerTypeEnum. CRON , - 1 , null, null, null ) ; } // 清除当前刻度列表的数据 clear ringItemData.clear() ; } } catch (Exception e) { if (! ringThreadToStop ) { logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}" , e) ; } } } logger .info( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop" ) ; } }) ; ringThread .setDaemon( true ) ; ringThread .setName( "xxl-job, admin JobScheduleHelper#ringThread" ) ; ringThread .start() ; }