1、工作流程
WEB: ExecutorServlet web端执行一个流的入口 1、ajaxExecuteFlow执行这个方法 1、getProjectAjaxByPermission执行这个方法,判断用户是否有权限执行这个工程 2、final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);获取一个ExecutableFlow对象 3、executorManagerAdapter.submitExecutableFlow(exflow, user.getUserId()); 提交这个flow ExecutorManager flow的管理类,上面把flow提交到了这个类 1、submitExecutableFlow 1、exflow.isLocked(),判断flow的状态 2、this.queuedFlows,判断放flow的队列是不是满了,满了就报错 3、this.queuedFlows.enqueue(exflow, reference);把flow放到队列中 2、QueueProcessorThread这个线程负责消费queuedFlows队列里面的flow 1、if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) { 根据当前时间和最后一次更新executor的时间与更新时间窗口做对比,已提交的flow跟允许提交的flow数量做对比,满足其一,向exector发送请求,获取exector的状态,包括cpu,mem,上次是否提交flow,以及运行的flow总数 2、if (exflow.getUpdateTime() > lastExecutorRefreshTime) 这块用来判断flow的最后更新时间,跟最后一次刷新executor的时间做对比,如果更新时间晚的话,则sleep到刷新executor状态的时候再进行提交 3、selectExecutorAndDispatchFlow(reference, exflow); 选择executor并且提交flow 1、selectExecutor(exflow, remainingExecutors); 1、getUserSpecifiedExecutor(exflow.getExecutionOptions(), exflow.getExecutionId() 指定executorId,直接去数据库里面查找wxwcutor是否存在 2、final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList, ExecutorManager.this.comparatorWeightsMap); 上述方式没有找到executor的情况下,创建一个ExecutorSelector选择器, 3、choosenExecutor = selector.getBest(availableExecutors, exflow); 根据executor的状态,上述刷新executor状态时候获取到的各项指标,进行对比,选择一个合适的executor 2、dispatch(reference, exflow, selectedExecutor); 找到executor以后进行flow的分配,发送到相应的executor EXEC: ExecutorServlet exec端执行一个流的入口 1、handleAjaxExecute 1、this.flowRunnerManager.submitFlow(execId); 具体执行的那个流的id,后面通过这个id去数据库获取flow的具体配置信息。 FlowRunnerManager 1、submitFlow 1、isAlreadyRunning(execId) 判断这个flow是否正在运行 2、final FlowRunner runner = createFlowRunner(execId); 通过execId创建一个FlowRunner对象 3、final Future<?> future = this.executorService.submit(runner); 往线程池里面提交这个FlowRunner对象 4、submitFlowRunner(runner); 提交这个flow去运行 2、submitFlowRunner 1、this.submittedFlows.put(future, runner.getExecutionId()); 往submittedFlows队列里面提交这个flow,submittedFlows只是为了记录已经提交的flow FlowRunner 1、run(),直接运行run方法 1、setupFlowExecution 添加配置信息 2、runFlow(); 真正的运行这个flow 1、runReadyJob 判断flow里面node的状态,也就是job的状态,从第一个job开始运行 2、runExecutableNode 运行具体的job 1、 prepareJobProperties(node); 准备job的配置文件 2、final JobRunner runner = createJobRunner(node); 创建一个JobRunner对象, 3、this.executorService.submit(runner); executorService是一个指定线程数量的线程池 this.executorService = Executors.newFixedThreadPool(this.numJobThreads, new ThreadFactoryBuilder().setNameFormat("azk-job-pool-%d").build()); 4、this.activeJobRunners.add(runner); activeJobRunners是一个记录正在运行job的队列 JobRunner 提交到executorService线程池以后开始运行 1、run(),直接运行run方法 1、doRun(); 1、createAttachmentFile(); 创建job的工作目录 2、createLogger(); 创建一个job运行日志的追加器 3、uploadExecutableNode(); 往数据库中插入正在运行的job 4、prepareJob() 判断job是否准备好 1、finalStatus = runJob(); 运行这个job 2、runJob() 1、this.job.run(); 1、执行的是ProcessJob的实现类 ProcessJob 1、run() 执行job的方法, 1、resolveProps(); 解析配置文件 2、this.process.run(); 执行的方法 2、public void run() 一直堵塞直到job执行完成 1、ProcessBuilder 使用java自带的实现来执行cmd 2、LogGobbler 一个获取job日志的方法 判断任务是否执行完成 JobRunner对象生成的时候有一个FlowWatcher对象,监听job的状态 FlowRunner在判断到执行成功的job时,会执行这个方法,finalizeFlow(),如果这个job是这个flow的最下游job的话,那么就把这个flow的状态改成执行成功,如果不是,就拿到job的下游job,继续执行。