2021SC@SDUSC(dolphinscheduler- common)

executeProcess按顺序调用了prepareProcess、runProcess、endProcess三个方法,简单来说就是初始化、执行、释放资源。 prepareProcess又按顺序调用了initTaskQueue、buildFlowDag。

initTaskQueue就是一些资源的初始化操作,比如通过流程定义ID查询到当前的任务实例。下面是其核心逻辑,可以发现,就是查询了完成的任务列表,报错且不能重试的任务列表。

List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
    if(task.isTaskComplete()){
        completeTaskList.put(task.getName(), task);
    }
    if(task.getState().typeIsFailure() && !task.taskCanRetry()){
        errorTaskList.put(task.getName(), task);
    }
}

buildFlowDag看名字应该是生成DAG实例的,代码虽短,但调用了好几个函数,我们只重点分析最后一个函数调用。

private void buildFlowDag() throws Exception {
    recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());

    forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
    // generate process to get DAG info
    List<String> recoveryNameList = getRecoveryNodeNameList();
    List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
    ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
            startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
    if(processDag == null){
        logger.error("processDag is null");
        return;
    }
    // generate process dag
    dag = DagHelper.buildDagGraph(processDag);
}

DagHelper.buildDagGraph生成了一个DAG对象实例,根据名字和注释猜测,这应该是对有向无环图的一个抽象。

/**
 * the object of DAG
 */
private DAG<String,TaskNode,TaskNodeRelation> dag;

来看下DAG类的定义

/**
 * analysis of DAG
 * Node: node
 * NodeInfo:node description information
 * EdgeInfo: edge description information
 */
public class DAG<Node, NodeInfo, EdgeInfo>

DAG有三个类型参数,分别代表节点key、节点信息、边信息。

下面是TaskNode的字段

2021SC@SDUSC(dolphinscheduler- common)

发现TaskNode的字段跟UI一一对应

2021SC@SDUSC(dolphinscheduler- common)

 

TaskNodeRelation代表边的信息,字段比较少,只有startNode、endNode两个String类型的字段。这其实是DAG类的第一个类型参数,节点的key。

public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {

    DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();

    /**
     * add vertex
     */
    if (CollectionUtils.isNotEmpty(processDag.getNodes())){
        for (TaskNode node : processDag.getNodes()){
            dag.addNode(node.getName(),node);
        }
    }

    /**
     * add edge
     */
    if (CollectionUtils.isNotEmpty(processDag.getEdges())){
        for (TaskNodeRelation edge : processDag.getEdges()){
            dag.addEdge(edge.getStartNode(),edge.getEndNode());
        }
    }
    return dag;
}

 

上面是buildDagGraph的源码。可以看出,增加节点时,第一个参数是TaskNode的getName。跟猜测的一样,DAG的第一个参数就是node的key,而key就是名称。

细心的读者一定发现,DAG对象是根据ProcessDag来创建的

2021SC@SDUSC(dolphinscheduler- common)

 

DAG是把节点、边的一个List转化成了一个Graph。

初始化完成之后,来看一下具体如何执行流程定义的。

2021SC@SDUSC(dolphinscheduler- common)

 

这个方法源码很长,我们首先从整体简要分析。

  1. submitPostNode(null)
  2. 起一个while循环,直至流程定义实例停止(成功、失败、取消、暂停、等待)
  3. 首先判断是否超时,超时则发送预警邮件
  4. 获取当前活动的任务节点的Map。key是MasterBaseTaskExecThread对象,value是Future<Boolean>。value其实是MasterBaseTaskExecThread线程的当前状态。
  5. 如果当前任务实例已经结束,则从Map中移除
  6. 如果当前任务实例成功,则put到completeTaskList且调用submitPostNode(task.getName())
  7. 如果当前任务实例失败,则重试;否则直接结束(比如手动停止或暂停)
  8. 更新当前流程定义实例的状态,进入下一个循环
上一篇:Spark综合学习笔记(二)


下一篇:LOJ #3165. 「CEOI2019」游乐园