Xxl-Job调度器原理解析

项目解析源码地址: https://gitee.com/lidishan/xxl-job-code-analysis xxl-job版本:2.3.0 Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。 Xxl-Job调度器原理解析   调度器初始化过程步骤如下 1 国际化相关 配置参数: xxl.job.i18n=zh_CN, 这里设置为中文简体   2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool 配置参数:xxl.job.triggerpool.fast.max=200, 这里设置为fastTriggerPool的最大线程数=200, 不能小于200          xxl.job.triggerpool.slow.max=100, 这里设置为slowTriggerPool的最大线程数=100, 不能小于100   3 启动注册监听线程 3.1 初始化registryOrRemoveThreadPool线程池:用于 注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 3.2 启动监听注册的线程registryMonitorThread: 清除心跳超过90s的注册信息,并且刷新分组注册信息   4 启动失败任务监听线程(重试、告警) 配置参数:spring.mail.from=xxx@qq.com, 告警邮箱   5 启动监控线程 5.1 初始化callbackThreadPool线程池:用于callback 回调的线程池,客户端调用api/callback接口时会使用这个线程池 5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态 超过10min,且对应执行器心跳注册 失败不在线,则将本地调度主动标记失败   6 启动日志统计和清除线程logrThread -- 日志记录刷新,刷新 最近三天的日志Report(即统计每天的失败、成功、运行次数等) -- 每天清除一次 失效过期的日志数据 配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除     7 启动任务调度(很重要!!主要靠这两个线程进行塞数据到时间轮,然后时间轮取数调度任务) 7.1 scheduleThread线程-取待执行任务数据入时间轮(塞数据) -- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行 -- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据 ----  preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; -- 第三步:将当前时间与下次调度时间对比,有如下三种情况 ****  当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理 --------  1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次 --------  2、刷新上一次触发 和 下一次待触发时间 ****  当前时间 大于 任务的下一次触发时间 并且是没有过期的: --------  1、直接触发任务执行器 --------  2、刷新上一次触发 和 下一次待触发时间 --------  3、如果下一次触发在五秒内,直接放进时间轮里面待调度 ----------------  1、求当前任务下一次触发时间所处一分钟的第N秒 ----------------  2、将当前任务ID和ringSecond放进时间轮里面 ----------------  3、刷新上一次触发 和 下一次待触发时间 ****  当前时间 小于 下一次触发时间: --------  1、求当前任务下一次触发时间所处一分钟的第N秒 --------  2、将当前任务ID和ringSecond放进时间轮里面 --------  3、刷新上一次触发 和 下一次待触发时间 -- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time -- 第五步:提交数据库事务,释放数据库select for update排它锁   7.2 ringThread线程 -根据时间轮执行job任务 (取数据执行) 首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>() -- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了 -- 第二步:执行触发器 -- 第三步:清除当前刻度列表的数据 **** 执行的过程中还会选择对应的策略,如下: -------- 阻塞策略:串行、废弃后面、覆盖前面 -------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询   初始化的入口代码为 XxlJobAdminConfig,代码如下: @Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean {     private static XxlJobAdminConfig adminConfig = null;     public static XxlJobAdminConfig getAdminConfig () {         return adminConfig ;     }     // ---------------------- XxlJobScheduler ----------------------     private XxlJobScheduler xxlJobScheduler ;     @Override     public void afterPropertiesSet () throws Exception { // 生命周期中的属性注入来对xxlJobScheduler初始化         adminConfig = this;           // 初始化 xxl-job 定时任务         xxlJobScheduler = new XxlJobScheduler() ;         xxlJobScheduler .init() ;     }     @Override     public void destroy () throws Exception { // 生命周期中的销毁来对xxlJobScheduler销毁         xxlJobScheduler .destroy() ;     }     ..............省略.............. }     xxlJobScheduler.init()进行初始化会执行如下过程: public class XxlJobScheduler {     private static final Logger logger = LoggerFactory. getLogger (XxlJobScheduler. class ) ;     public void init () throws Exception {           // 1 国际化相关 init i18n           initI18n() ;           // 2 初始化快线程池 fastTriggerPool 、慢线程池 slowTriggerPool admin trigger pool start           JobTriggerPoolHelper. toStart () ;           // 3 启动注册监听线程 admin registry monitor run           JobRegistryHelper. getInstance ().start() ;           // 4 启动失败任务监听线程 ( 重试、告警 ) admin fail-monitor run           JobFailMonitorHelper. getInstance ().start() ;           // 5 启动监控线程(调度记录停留在 " 运行中 " 状态超过 10min ,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败) admin lose-monitor run ( depend on JobTriggerPoolHelper )           JobCompleteHelper. getInstance ().start() ;           // 6 启动日志统计和清除线程(日志记录刷新,刷新最近三天的日志 Report (即统计每天的失败、成功、运行次数等);每天清除一次失效过期的日志数据) admin log report start           JobLogReportHelper. getInstance ().start() ;           // 7 启动任务调度 (scheduleThread- 取待执行任务数据入时间轮; ringThread- 根据时间轮执行 job 任务 ) start-schedule ( depend on JobTriggerPoolHelper )           JobScheduleHelper. getInstance ().start() ;           logger .info( ">>>>>>>>> init xxl-job admin success." ) ;      }     ................省略........................ }  
上面初始化的7个步骤拆分如下============== 1 国际化相关 private void initI18n (){ // 根据环境设置title为中文、英文等     for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum. values ()) {         item.setTitle(I18nUtil. getString ( "jobconf_block_" .concat(item.name()))) ;     } }   2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool 这个步骤初始化了两个线程池 fastTriggerPool和 slowTriggerPool 在触发调度的时候会有一个选择快慢线程池的过程,如果job在一分钟内超过超过10次,就用slowTriggerPool来处理,如下: ThreadPoolExecutor triggerPool_ = fastTriggerPool ; AtomicInteger jobTimeoutCount = jobTimeoutCountMap .get(jobId) ; if (jobTimeoutCount!= null && jobTimeoutCount.get() > 10 ) { // job 在一分钟内超过超过 10 次,就用 slowTriggerPool 来处理 job-timeout 10 times in 1 min      triggerPool_ = slowTriggerPool ; } triggerPool_.execute( new Runnable() {.........省略............}   3 启动注册监听线程 3.1 初始化registryOrRemoveThreadPool线程池:用于 注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 3.2 启动监听注册的线程registryMonitorThread: 清除心跳超过90s的注册信息,并且刷新分组注册信息 public void start(){     // 用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销    for registry or remove     registryOrRemoveThreadPool = new ThreadPoolExecutor(          2,          10,          30L,          TimeUnit.SECONDS,          new LinkedBlockingQueue<Runnable>( 2000),          new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());             }          },          new RejectedExecutionHandler() {             @Override             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                r.run();                logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");             }          });       // 启动监听注册的线程      for monitor     registryMonitorThread = new Thread(new Runnable() {       @Override       public void run() {          while (!toStop) {             try {                 // 获取自动注册的执行器组(执行器地址类型:0=自动注册、1=手动录入) auto registry group                List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);                if (groupList!=null && !groupList.isEmpty()) {// group组集合不为空                    // 移除死掉的调用地址(心跳时间超过90秒,就当线程挂掉了。默认是30s做一次心跳)          remove dead address (admin/executor)                   List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());                   if (ids!=null && ids.size()>0) { // 移除挂掉的注册地址信息                      XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);                   }                       // fresh online address (admin/executor)                   HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();                    // 找出所有正常没死掉的注册地址                   List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());                   if (list != null) {                      for (XxlJobRegistry item: list) {                          // 确保是 EXECUTOR 执行器类型                         if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {                            String appname = item.getRegistryKey();                            List<String> registryList = appAddressMap.get(appname);                            if (registryList == null) {                               registryList = new ArrayList<String>();                            }                                if (!registryList.contains(item.getRegistryValue())) {                               registryList.add(item.getRegistryValue());                            }                            appAddressMap.put(appname, registryList);                         }                      }                   }                        // 刷新分组注册地址信息  fresh group address                   for (XxlJobGroup group: groupList) {                      List<String> registryList = appAddressMap.get(group.getAppname());                      String addressListStr = null;                      if (registryList!=null && !registryList.isEmpty()) {                         Collections.sort(registryList);                         StringBuilder addressListSB = new StringBuilder();                         for (String item:registryList) {                            addressListSB.append(item).append(",");                         }                         addressListStr = addressListSB.toString();                         addressListStr = addressListStr.substring(0, addressListStr.length()-1);                      }                      group.setAddressList(addressListStr);                      group.setUpdateTime(new Date());                          XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);                   }                }             } catch (Exception e) {                if (!toStop) {                   logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);                }             }             try {                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);             } catch (InterruptedException e) {                if (!toStop) {                   logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);                }             }          }          logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");       }         });    registryMonitorThread.setDaemon(true);    registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");    registryMonitorThread.start(); }   4 启动失败任务监听线程(重试、告警) 这部分逻辑比较简单,就是重试 + 告警,核心代码如下 // 获取执行失败的job信息 List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000); if (failLogIds!=null && !failLogIds.isEmpty()) {    for (long failLogId: failLogIds) {       // lock log       int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);       if (lockRet < 1) {          continue;       }       XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);       XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());          // 1、失败塞回重试       fail retry monitor       if (log.getExecutorFailRetryCount() > 0) {          JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);          String retryMsg = "<span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>";          log.setTriggerMsg(log.getTriggerMsg() + retryMsg);          XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);       }          // 2、进行失败告警       fail alarm monitor       int newAlarmStatus = 0;        // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败       if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {          boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);          newAlarmStatus = alarmResult?2:3;       } else {          newAlarmStatus = 1;       }       XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);    } }   5 启动监控线程 5.1 初始化callbackThreadPool线程池:用于callback 回调的线程池,客户端调用api/callback接口时会使用这个线程池 5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态 超过10min,且对应执行器心跳注册 失败不在线,则将本地调度主动标记失败 逻辑较简单,如上两点   6 启动日志统计和清除线程logrThread -- 日志记录刷新,刷新 最近三天的日志Report(即统计每天的失败、成功、运行次数等) -- 每天清除一次 失效过期的日志数据 配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除 逻辑较简单,如上两点   7 启动任务调度 7.1 scheduleThread-取待执行任务数据入时间轮 -- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行 -- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据 ----  preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; -- 第三步:将当前时间与下次调度时间对比,有如下三种情况 ****  当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理 --------  1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次 --------  2、刷新上一次触发 和 下一次待触发时间 ****  当前时间 大于 任务的下一次触发时间 并且是没有过期的: --------  1、直接触发任务执行器 --------  2、刷新上一次触发 和 下一次待触发时间 --------  3、如果下一次触发在五秒内,直接放进时间轮里面待调度 ----------------  1、求当前任务下一次触发时间所处一分钟的第N秒 ----------------  2、将当前任务ID和ringSecond放进时间轮里面 ----------------  3、刷新上一次触发 和 下一次待触发时间 ****  当前时间 小于 下一次触发时间: --------  1、求当前任务下一次触发时间所处一分钟的第N秒 --------  2、将当前任务ID和ringSecond放进时间轮里面 --------  3、刷新上一次触发 和 下一次待触发时间 -- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time -- 第五步:提交数据库事务,释放数据库select for update排它锁   7.2 ringThread-根据时间轮执行job任务 首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>() -- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了 -- 第二步:执行触发器 -- 第三步:清除当前刻度列表的数据 **** 执行的过程中还会选择对应的策略,如下: -------- 阻塞策略:串行、废弃后面、覆盖前面 -------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询 启动两个线程解析的核心源码如下: public void start (){        // 启动调度线程,这些线程是用来取数据的 schedule thread     scheduleThread = new Thread( new Runnable() {     @Override     public void run () {     try { // 不知道为啥要休眠 4-5 秒 时间,然后再启动         TimeUnit. MILLISECONDS .sleep( 5000 - System. currentTimeMillis ()% 1000 ) ;     } catch (InterruptedException e) {         if (! scheduleThreadToStop ) {             logger .error(e.getMessage() , e) ;         }     }     logger .info( ">>>>>>>>> init xxl-job admin scheduler success." ) ;        // 这里是预读数量 pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)     int preReadCount = (XxlJobAdminConfig. getAdminConfig ().getTriggerPoolFastMax() + XxlJobAdminConfig. getAdminConfig ().getTriggerPoolSlowMax()) * 20 ;       while (! scheduleThreadToStop ) {     // 扫描任务 Scan Job     long start = System. currentTimeMillis () ;     Connection conn = null;     Boolean connAutoCommit = null;     PreparedStatement preparedStatement = null     boolean preReadSuc = true;     try {         conn = XxlJobAdminConfig. getAdminConfig ().getDataSource().getConnection() ;           connAutoCommit = conn.getAutoCommit() ;           conn.setAutoCommit( false ) ;           // 采用 select for update ,是排它锁。说白了 xxl-job 用一张数据库表来当分布式锁了,确保多个 xxl-job admin 节点下,依旧只能同时执行一个调度线程任务         preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ) ;           preparedStatement.execute() ;             // tx start             // 1 、预读数据 pre read           long nowTime = System. currentTimeMillis () ;           // -- 从数据库中读取截止到五秒后未执行的 job ,并且读取 preReadCount=6000 条           List<XxlJobInfo> scheduleList = XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS , preReadCount) ;           if (scheduleList!= null && scheduleList.size()> 0 ) {               // 2 、 push 压进 时间轮 push time-ring               for (XxlJobInfo jobInfo: scheduleList) {                     // time-ring jump                     if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS ) {                         // 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS ( 5s )) , 可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理                         // 2.1 、 trigger-expire > 5s : pass && make next-trigger-time                         logger .warn( ">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()) ;                         // 1 、匹配过期失效的策略: DO_NOTHING= 过期啥也不干,废弃; FIRE_ONCE_NOW= 过期立即触发一次 misfire match                         MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum. match (jobInfo.getMisfireStrategy() , MisfireStrategyEnum. DO_NOTHING ) ;                       if (MisfireStrategyEnum. FIRE_ONCE_NOW == misfireStrategyEnum) {                             // FIRE_ONCE_NOW 》 trigger                               JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. MISFIRE , - 1 , null, null, null ) ;                               logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ;                         }                         // 2 、刷新上一次触发 和 下一次待触发时间 fresh next                          refreshNextValidTime(jobInfo , new Date()) ;                     } else if (nowTime > jobInfo.getTriggerNextTime()) {                         // 当前时间 大于 任务的下一次触发时间 并且是没有过期的                     // 2.2 、 trigger-expire < 5s : direct-trigger && make next-trigger-time                         // 1 、直接触发任务执行器 trigger                         JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. CRON , - 1 , null, null, null ) ;                         logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ;                         // 2 、刷新上一次触发 和 下一次待触发时间 fresh next                         refreshNextValidTime(jobInfo , new Date()) ;                           // 如果下一次触发在五秒内,直接放进时间轮里面待调度 next-trigger-time in 5s, pre-read again                         if (jobInfo.getTriggerStatus()== 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {                               // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second                               int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ;                               // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring                               pushTimeRing(ringSecond , jobInfo.getId()) ;                               // 3 、刷新上一次触发 和 下一次待触发时间 fresh next                               refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ;                         }                       } else {                       // 当前时间 小于 下一次触发时间                         // 2.3 、 trigger-pre-read : time-ring trigger && make next-trigger-time                         // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second                         int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ;                         // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring                         pushTimeRing(ringSecond , jobInfo.getId()) ;                         // 3 、刷新上一次触发 和 下一次待触发时间 fresh next                         refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ;                     }               }                 // 3 、更新数据库执行器信息,如 trigger_last_time 、 trigger_next_time update trigger info               for (XxlJobInfo jobInfo: scheduleList) {                     XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleUpdate(jobInfo) ;               }             } else {             preReadSuc = false;           }           // tx stop     } catch (Exception e) {           if (! scheduleThreadToStop ) {               logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}" , e) ;           }     } finally {           // 提交事务,释放数据库 select for update 的锁 commit         .......................省略.............         }     long cost = System. currentTimeMillis ()-start ;        // 如果执行太快了,就稍微 sleep 等待一下 Wait seconds, align second     if (cost < 1000 ) { // scan-overtime, not wait         try {             // pre-read period: success > scan each second; fail > skip this period;             TimeUnit. MILLISECONDS .sleep((preReadSuc? 1000 : PRE_READ_MS ) - System. currentTimeMillis ()% 1000 ) ;         } catch (InterruptedException e) {             if (! scheduleThreadToStop ) {                 logger .error(e.getMessage() , e) ;             }         }     }) ;     scheduleThread .setDaemon( true ) ;     scheduleThread .setName( "xxl-job, admin JobScheduleHelper#scheduleThread" ) ;     scheduleThread .start() ;          // 时间轮线程,用于取出每秒的数据,然后处理 ring thread     ringThread = new Thread( new Runnable() {         @Override         public void run () {             while (! ringThreadToStop ) {                    // align second                    try {                        TimeUnit. MILLISECONDS .sleep( 1000 - System. currentTimeMillis () % 1000 ) ;                    } catch (InterruptedException e) {                     if (! ringThreadToStop ) {                         logger .error(e.getMessage() , e) ;                     }                 }                    try {                        // second data                        List<Integer> ringItemData = new ArrayList<>() ;                        // 获取当前所处的一分钟第几秒,然后 for 两次,第二次是为了重跑前面一个刻度没有被执行的的 job list ,避免前面的刻度遗漏了                     int nowSecond = Calendar. getInstance ().get(Calendar. SECOND ) ; // 避免处理耗时太长,跨过刻度,向前校验一个刻度;                     for ( int i = 0 ; i < 2 ; i++) {                         List<Integer> tmpData = ringData .remove( (nowSecond+ 60 -i)% 60 ) ;                         if (tmpData != null ) {                             ringItemData.addAll(tmpData) ;                         }                        }                          // ring trigger                        logger .debug( ">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays. asList (ringItemData) ) ;                        if (ringItemData.size() > 0 ) {                            // do trigger                               for ( int jobId: ringItemData) {                                   // 执行触发器 do trigger                                   JobTriggerPoolHelper. trigger (jobId , TriggerTypeEnum. CRON , - 1 , null, null, null ) ;                               }                               // 清除当前刻度列表的数据 clear                               ringItemData.clear() ;                        }                    } catch (Exception e) {                          if (! ringThreadToStop ) {                               logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}" , e) ;                        }                    }                }             logger .info( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop" ) ;         }     }) ;     ringThread .setDaemon( true ) ;     ringThread .setName( "xxl-job, admin JobScheduleHelper#ringThread" ) ;     ringThread .start() ; }                      
上一篇:jQuery事件自动触发


下一篇:VSCode修改万能提示键