2021SC@SDUSC(dolphinscheduler- common2)

activeTaskNode是一个非常重要的对象,从上一篇文章的分析中,可以猜测,activeTaskNode是由submitPostNode间接生成赋值的,并通过while循环驱动了整个流程实例的执行。

private void submitPostNode(String parentNodeName){

    List<TaskInstance> submitTaskList = null;
    if(parentNodeName == null){
        submitTaskList = getStartSubmitTaskList();
    }else{
        submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName);
    }
    // if previous node success , post node submit
    for(TaskInstance task : submitTaskList){
        if(readyToSubmitTaskList.containsKey(task.getName())){
            continue;
        }

        if(completeTaskList.containsKey(task.getName())){
            logger.info("task {} has already run success", task.getName());
            continue;
        }
        if(task.getState().typeIsPause() || task.getState().typeIsCancel()){
            logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString());
        }else{
            addTaskToStandByList(task);
        }
    }
}

submitPostNode的源码细节不再深入分析,大概就是从dag对象中找出入度为0的节点,放入到准备队列中。其实在runProcess方法中,还调用了submitStandByTask方法,该方法最终调起了可以执行的节点。从这点来看,整个流程实例由submitPostNode、submitStandByTask和while驱动。

那么问题来了,流程实例的任务具体是怎么调起来的呢?下面是submitStandByTask方法中调用的最重要的函数,也是由它调起来的。

/**
 * submit task to execute
 * @param taskInstance task instance
 * @return TaskInstance
 */
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
    MasterBaseTaskExecThread abstractExecThread = null;
    if(taskInstance.isSubProcess()){
        abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
    }else {
        abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
    }
    Future<Boolean> future = taskExecService.submit(abstractExecThread);
    activeTaskNode.putIfAbsent(abstractExecThread, future);
    return abstractExecThread.getTaskInstance();
}

逻辑也比较简单,就是把TaskInstance交给MasterTaskExecThread去执行;taskExecService提交之后,放到activeTaskNode列表,交由主逻辑判断任务是否完成。

MasterTaskExecThread

根据其定义,我们知道MasterTaskExecThread继承了MasterBaseTaskExecThread,且构造函数简单的调用了父类的构造函数。

public class MasterTaskExecThread extends MasterBaseTaskExecThread

MasterBaseTaskExecThread的构造函数也比较简单,给几个关键的字段赋初始值。

/**
 * constructor of MasterBaseTaskExecThread
 * @param taskInstance      task instance
 * @param processInstance   process instance
 */
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
    this.processDao = BeanContext.getBean(ProcessDao.class);
    this.alertDao = BeanContext.getBean(AlertDao.class);
    this.processInstance = processInstance;
    this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
    this.cancel = false;
    this.taskInstance = taskInstance;
}

但processDao、alertDao居然是通过BeanContext.getBean获取到的!!!个人感觉这是一个非常恶心的设计。一个优秀的设计,应该是类的创建者负责子类的参数及其功能的边界。BeanContext.getBean扩展了所有类与SpringBoot的ApplicationContext间接打交道的能力,而且无法控制,因为只要调用BeanContext.getBean都可以获取到对应的bean进行操作。

MasterBaseTaskExecThread实现了Callable<Boolean>接口,call方法又调用了submitWaitComplete,MasterTaskExecThread类中对改方法进行了覆盖。

submitWaitComplete根据名称及其注释说明可以知道,它提交了一个任务实例,然后等待其完成。

/**
 * submit task instance and wait complete
 * @return true is task quit is true
 */
@Override
public Boolean submitWaitComplete() {
    Boolean result = false;
    this.taskInstance = submit();
    if(!this.taskInstance.getState().typeIsFinished()) {
        result = waitTaskQuit();
    }
    taskInstance.setEndTime(new Date());
    processDao.updateTaskInstance(taskInstance);
    logger.info("task :{} id:{}, process id:{}, exec thread completed ",
            this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
    return result;
}

该函数的逻辑简单来说就是,提交一个任务实例,等待任务完成,更新任务结束时间到数据。

我们可以看出,每个任务实例都可以更新数据库,加上其他线程,对数据库的压力可能很大。如果任务非常多,并发非常大的情况下,jdbc连接线程池需要适当调大。否则,数据库会成为系统瓶颈。如果worker节点个数过多,这种压力又会几何倍数的增长。

上一篇:MySQL 5.7-11.2.2 The DATE, DATETIME, and TIMESTAMP Types


下一篇:06