azkaban整体工作流程

1、工作流程


WEB:   ExecutorServlet web端执行一个流的入口   1、ajaxExecuteFlow执行这个方法     1、getProjectAjaxByPermission执行这个方法,判断用户是否有权限执行这个工程     2、final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);获取一个ExecutableFlow对象     3、executorManagerAdapter.submitExecutableFlow(exflow, user.getUserId()); 提交这个flow ExecutorManager  flow的管理类,上面把flow提交到了这个类   1、submitExecutableFlow     1、exflow.isLocked(),判断flow的状态     2、this.queuedFlows,判断放flow的队列是不是满了,满了就报错     3、this.queuedFlows.enqueue(exflow, reference);把flow放到队列中   2、QueueProcessorThread这个线程负责消费queuedFlows队列里面的flow     1、if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow             || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {  根据当前时间和最后一次更新executor的时间与更新时间窗口做对比,已提交的flow跟允许提交的flow数量做对比,满足其一,向exector发送请求,获取exector的状态,包括cpu,mem,上次是否提交flow,以及运行的flow总数     2、if (exflow.getUpdateTime() > lastExecutorRefreshTime) 这块用来判断flow的最后更新时间,跟最后一次刷新executor的时间做对比,如果更新时间晚的话,则sleep到刷新executor状态的时候再进行提交     3、selectExecutorAndDispatchFlow(reference, exflow); 选择executor并且提交flow       1、selectExecutor(exflow, remainingExecutors);          1、getUserSpecifiedExecutor(exflow.getExecutionOptions(),               exflow.getExecutionId() 指定executorId,直接去数据库里面查找wxwcutor是否存在          2、final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,             ExecutorManager.this.comparatorWeightsMap); 上述方式没有找到executor的情况下,创建一个ExecutorSelector选择器,          3、choosenExecutor = selector.getBest(availableExecutors, exflow); 根据executor的状态,上述刷新executor状态时候获取到的各项指标,进行对比,选择一个合适的executor       2、dispatch(reference, exflow, selectedExecutor); 找到executor以后进行flow的分配,发送到相应的executor     EXEC:   ExecutorServlet exec端执行一个流的入口   1、handleAjaxExecute     1、this.flowRunnerManager.submitFlow(execId); 具体执行的那个流的id,后面通过这个id去数据库获取flow的具体配置信息。 FlowRunnerManager   1、submitFlow     1、isAlreadyRunning(execId) 判断这个flow是否正在运行     2、final FlowRunner runner = createFlowRunner(execId); 通过execId创建一个FlowRunner对象     3、final Future<?> future = this.executorService.submit(runner); 往线程池里面提交这个FlowRunner对象     4、submitFlowRunner(runner); 提交这个flow去运行   2、submitFlowRunner     1、this.submittedFlows.put(future, runner.getExecutionId()); 往submittedFlows队列里面提交这个flow,submittedFlows只是为了记录已经提交的flow FlowRunner   1、run(),直接运行run方法     1、setupFlowExecution 添加配置信息     2、runFlow(); 真正的运行这个flow       1、runReadyJob 判断flow里面node的状态,也就是job的状态,从第一个job开始运行       2、runExecutableNode 运行具体的job         1、 prepareJobProperties(node); 准备job的配置文件         2、final JobRunner runner = createJobRunner(node); 创建一个JobRunner对象,         3、this.executorService.submit(runner); executorService是一个指定线程数量的线程池         this.executorService = Executors.newFixedThreadPool(this.numJobThreads,             new ThreadFactoryBuilder().setNameFormat("azk-job-pool-%d").build());         4、this.activeJobRunners.add(runner); activeJobRunners是一个记录正在运行job的队列 JobRunner 提交到executorService线程池以后开始运行   1、run(),直接运行run方法     1、doRun();       1、createAttachmentFile(); 创建job的工作目录       2、createLogger(); 创建一个job运行日志的追加器       3、uploadExecutableNode(); 往数据库中插入正在运行的job       4、prepareJob() 判断job是否准备好         1、finalStatus = runJob(); 运行这个job   2、runJob()     1、this.job.run();       1、执行的是ProcessJob的实现类 ProcessJob   1、run() 执行job的方法,     1、resolveProps(); 解析配置文件     2、this.process.run(); 执行的方法   2、public void run()  一直堵塞直到job执行完成     1、ProcessBuilder 使用java自带的实现来执行cmd     2、LogGobbler 一个获取job日志的方法      判断任务是否执行完成 JobRunner对象生成的时候有一个FlowWatcher对象,监听job的状态 FlowRunner在判断到执行成功的job时,会执行这个方法,finalizeFlow(),如果这个job是这个flow的最下游job的话,那么就把这个flow的状态改成执行成功,如果不是,就拿到job的下游job,继续执行。       

 

上一篇:2019-07-20


下一篇:ZooKeeper, HBase, Azkaban