xxl-job源码解析

一. xxl-job 简介

二. xxl-job 的架构体系

xxl-job源码解析

三. 调度中心

  • 本篇使用的源码是:2.3.0-SNAPSHOT 版本
  • 调度中心源码分析入口是在 com.xxl.job.admin.core.conf.XxlJobAdminConfig 中,可以看到这个类实现了 InitializingBean, DisposableBean 接口,这两个接口是 spring 的 bean 初始化完成和销毁是会触发 afterPropertiesSetdestroy 方法,这里不了解的可以自行百度哈
  • 首先来看 bean 初始化完成,也就是调度中心启动的流程 xxl-job源码解析
  • 进入 com.xxl.job.admin.core.scheduler.XxlJobScheduler#init 方法中,按顺序跟进代码xxl-job源码解析
    可以看到这个方法初始化了调度中心的所有流程,具体可以看上图的注释 (有不对的地方欢迎指正),对于 initI18n() 方法就不看了,就是国际化的正常操作,接着看下一个方法
  • 进入JobTriggerPoolHelper.toStart(),这个方法看代码其实就是初始化了两个线程池 fastTriggerPoolslowTriggerPool,这两个线程池顾名思义就是 快 / 慢 线程池,其中两个线程池的最大线程数可配置,这两个线程池在执行线程调度的时候会进行选择,后面会讲到,这里了解这一步就是初始化了快慢线程池就行xxl-job源码解析
  • 接着进入到 JobRegistryHelper.getInstance().start() 里面,这一步初始化了执行器的注册、移除的线程池registryOrRemoveThreadPool,这个线程池是在执行器相关的操作中处理注册执行的添加和移除的,后面讲注册器部分的时候在讲;在这一步中同时启动了一个线程registryMonitorThread,这个线程作用是维护自动注册的执行的,由于篇幅较长,直接贴代码不截图了,根据代码大家可以自行理解逻辑
// for monitor
		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						// 获取所有自动注册类型的 执行器
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							// 获取实时维护的执行器注册表,维护在线的执行器和调度中心机器地址信息 这里是获取超时的执行器(已死亡的->超过90秒未变更数据的)
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								//如果有已死亡的执行器  直接从实时维护的表中删除掉
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor)
							HashMap<String, List<String>> appAddressMap = new HashMap<>();
							//查询所有活跃的执行器注册表,就是update_time 90秒内有更新的
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										//执行器的名称 其实就是执行器端配置的xxl.job.executor.appname
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										//就是把同一个executor.appname的多个执行器 根据appname分组维护到这个map中
										//一个executor.appname可能有多个地址的执行器  就是一个项目分布式的部署 但是executor.appname相同
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// fresh group address
							for (XxlJobGroup group: groupList) {
								//遍历所有自动注册类型的 执行器
								//从上面 筛选的所有活跃的执行器Map中获取最新的执行器地址+端口号
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									//如果不是空的List 那么给这些执行器地址排个序
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									//按照逗号分割
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									//变成一个逗号分割排好序的地址列表
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								group.setAddressList(addressListStr);
								group.setUpdateTime(new Date());

								//更新到XxlJobGroup表中
								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						//每30秒执行一次 上面的逻辑 进行自动注册的执行器地址更新
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});
  • 接着进入 JobFailMonitorHelper.getInstance().start(); 这里初始化了一个线程monitorThread,这个线程主要用来处理执行失败的任务重试和告警xxl-job源码解析
  • 接着进入JobCompleteHelper.getInstance().start(),这一步初始化了回调线程池callbackThreadPool,用来处理执行器任务执行后的回调,同时初始化了 monitorThread 线程,用来检测回调超时的任务并处理xxl-job源码解析
  • 接着进入JobLogReportHelper.getInstance().start(),这一步启动了 logrThread 线程 对调度日志进行统计和清理,主要是统计 xxl_job_log 表 3 天内的成功、运行中、失败的调度日志 并记录到 xxl_job_log_report 表中,同时根据对配置时间 logretentiondays 的日志进行清理,主要作用就是维护日志,比较简单大家自己看看源码,我就不贴出来了
  • 接下来进入重点:JobScheduleHelper.getInstance().start(),这里才是真正的对任务进行调度触发的地方,这里需要大家多看看代码,理解每一步的含义,我把我对这个流程的理解都在注释里面写出来了,有不对的欢饮大家指正,进入方法看:
public void start(){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    //5秒后的整秒   当前毫秒%1000 是取余得到下次整秒的事件距离
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                // preReadCount = (200 + 100) * 20  预读条数
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while (!scheduleThreadToStop) {

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {
                        //先获取到数据库连接
                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        //设置不自动提交事务
                        conn.setAutoCommit(false);

                        //for update mysql锁  保证同时只有一个在执行
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        // 扫出来trigger_status(调度状态) = 1(运行) and trigger_next_time(下次调度时间) <= now + 5秒的任务
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring  遍历查询的任务
                            for (XxlJobInfo jobInfo: scheduleList) {

                                // time-ring jump  若果当前时间 > 该任务下次执行时间 + 5秒  那说明这个任务调度超时了 当前时间超过了本该执行时间+5秒
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    //调度超时会查出来这个任务 因为查询条件查询的:trigger_next_time(下次调度时间) <= now + 5秒的任务
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match  如果查到了这个调度超时的任务  那么获取 调度过期策略 并进行调度处理
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        // 如果这个任务的超时处理策略是立即执行一次:FIRE_ONCE_NOW(立即执行一次),那么立即执行一次这个任务
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2、fresh next  刷新下次执行时间
                                    refreshNextValidTime(jobInfo, new Date());

                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    //如果当前时间大于下次执行时间
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 1、trigger  立即触发一次任务
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2、fresh next 刷新下次执行时间
                                    // 注意这里是在这个任务被查出来后第一次刷新下次时间
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    // 如果任务运行状态是:运行状态 并且 当前时间+5秒 > 下次执行(这个下次执行时间jobInfo.getTriggerNextTime() 是第一次被刷新后的时间)
                                    // 那么把这个任务加入到时间轮中,留待时间轮线程消费
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                                        // 1、make ring second   获取这个任务下次的执行时间的整秒
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2、push time ring     把算出来的整秒放进时间轮里面 每秒一个时间轮bucket
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next  刷新下次调度时间
                                        // 在这个任务被加入时间轮前 这个任务的jobInfo.getTriggerNextTime()下次执行时间就已经是被刷新了的,也就是下一次的时间
                                        // 这里再次刷新一次,这个下次执行时间(本轮调度后的第二次执行时间)会被记录到数据库
                                        // 这样时间轮里面记录的是本轮调度后的下一次调度时间,更新到数据库的是时间轮调度后的下一次调度时间
                                        // 就算下次循环数据库扫描出来这个任务,那也是时间轮调度后的下一次 不会存在重复执行的可能
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                } else {
                                    // 如果不是以上两种情况,那这个任务应该是没到时间的  加入时间轮等待执行
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }

                            // 3、update trigger info  将执行完成的任务状态和下次时间等更新到数据库中
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            //如果本次没有扫到可以执行的任务 把preReadSuc改为false 留待后面的sleep使用
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // commit
                        if (conn != null) {
                            try {
                                //提交锁
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                //恢复原来的提交机制
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                //关闭连接
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    //得到本次循环内调度任务耗费的时间
                    long cost = System.currentTimeMillis()-start;


                    // Wait seconds, align second
                    // 如果本次耗费时间小于1秒
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            // 如果preReadSuc为true则表示本次有任务调度,sleep 到下一次整秒
                            // 如果preReadSuc为false表示本次没有任务调度,sleep 到第5个整秒
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // ring thread
        //处理时间轮中存放的定时任务
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                // align second
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                    // 开始while循环的秒
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                /**
                 * 对于这个线程的工作流程  可以查看下面这个栗子 对应 ringThread run方法中的注释,可以很清楚的看到每次循环取出的是哪几秒的数据
                 * 其实就是取当前秒的和上一秒的
                 *
                 * 栗子:(把这段代码逻辑拷出去输出一下秒数就知道了)
                 * 开始while循环的秒:27
                 * 每次循环开始---------------------------
                 * 当前的秒 === 27
                 * 取出的秒 ===27
                 * 取出的秒 ===26
                 * 每次循环开始---------------------------
                 * 当前的秒 === 28
                 * 取出的秒 ===28
                 * 取出的秒 ===27
                 * 每次循环开始---------------------------
                 * 当前的秒 === 29
                 * 取出的秒 ===29
                 * 取出的秒 ===28
                 * 每次循环开始---------------------------
                 * 当前的秒 === 30
                 * 取出的秒 ===30
                 * 取出的秒 ===29
                 */
                while (!ringThreadToStop) {
                    //每次循环开始---------------------------
                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        // 获取当前的秒数
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
                        // 当前的秒 ===

                        //这个for循环取出的是在时间轮中当前秒的和前一秒的
                        //就是怕取出任务后调度的时间比较长  所以往前多取一秒 保证时间轮每个bucket都被处理
                        for (int i = 0; i < 2; i++) {
                            //取出的秒 ===
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }

                    // next second, align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

四. 结尾

本篇主要靠大家看注释理解代码,我懒得画图去解释调度中心的调度流程,多看两遍代码就理解了,后面有空再完善图解流程执行器部分以及调度 RPC 相关的源码解析,先写到这

上一篇:Flink Trigger


下一篇:python实现有限状态机