datax源码阅读四:TaskGroupContainer

根据前面源码阅读可以知道,JobContainer将所有的task分配到TaskGroup中执行,TaskGroup启动5个线程去消费所有的task的,具体实现为

public void start() {
    try {
        /**
         * 状态check时间间隔,较短,可以把任务及时分发到对应channel中
         */
        int sleepIntervalInMillSec = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
        /**
         * 状态汇报时间间隔,稍长,避免大量汇报
         */
        long reportIntervalInMillSec = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,
                10000);
        /**
         * 2分钟汇报一次性能统计
         */
        // 获取channel数目
        int channelNumber = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
        int taskMaxRetryTimes = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);
        long taskRetryIntervalInMsec = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
        long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
        List<Configuration> taskConfigs = this.configuration
                .getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
        if(LOG.isDebugEnabled()) {
            LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId,
                    JSON.toJSONString(taskConfigs));
        }
        int taskCountInThisTaskGroup = taskConfigs.size();
        /*LOG.info(String.format(
                "taskGroupId=[%d] start [%d] channels for [%d] tasks.",
                this.taskGroupId, channelNumber, taskCountInThisTaskGroup));*/
        this.containerCommunicator.registerCommunication(taskConfigs);
        Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置
        List<Configuration> taskQueue = buildRemainTasks(taskConfigs); //待运行task列表
        Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>(); //taskId与上次失败实例
        List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); //正在运行task
        Map<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); //任务开始时间
        long lastReportTimeStamp = 0;
        Communication lastTaskGroupContainerCommunication = new Communication();
        while (true) {
            //1.判断task状态
            boolean failedOrKilled = false;
            Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
            for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
                Integer taskId = entry.getKey();
                Communication taskCommunication = entry.getValue();
                if(!taskCommunication.isFinished()){
                    continue;
                }
                TaskExecutor taskExecutor = removeTask(runTasks, taskId);
                //上面从runTasks里移除了,因此对应在monitor里移除
                taskMonitor.removeTask(taskId);
                //失败,看task是否支持failover,重试次数未超过最大限制
                if(taskCommunication.getState() == State.FAILED){
                    taskFailedExecutorMap.put(taskId, taskExecutor);
                    if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){
                        taskExecutor.shutdown(); //关闭老的executor
                        containerCommunicator.resetCommunication(taskId); //将task的状态重置
                        Configuration taskConfig = taskConfigMap.get(taskId);
                        taskQueue.add(taskConfig); //重新加入任务列表
                    }else{
                        failedOrKilled = true;
                        break;
                    }
                }else if(taskCommunication.getState() == State.KILLED){
                    failedOrKilled = true;
                    break;
                }else if(taskCommunication.getState() == State.SUCCEEDED){
                    Long taskStartTime = taskStartTimeMap.get(taskId);
                    if(taskStartTime != null){
                        Long usedTime = System.currentTimeMillis() - taskStartTime;
                        /*LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
                                this.taskGroupId, taskId, usedTime);*/
                        //usedTime*1000*1000 转换成PerfRecord记录的ns,这里主要是简单登记,进行最长任务的打印。因此增加特定静态方法
                        PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);
                        taskStartTimeMap.remove(taskId);
                        taskConfigMap.remove(taskId);
                    }
                }
            }
            // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
            if (failedOrKilled) {
                lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                        lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                throw DataXException.asDataXException(
                        FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
            }
            //3.有任务未执行,且正在运行的任务数小于最大通道限制
            Iterator<Configuration> iterator = taskQueue.iterator();
            while(iterator.hasNext() && runTasks.size() < channelNumber){
                Configuration taskConfig = iterator.next();
                Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
                int attemptCount = 1;
                TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
                if(lastExecutor!=null){
                    attemptCount = lastExecutor.getAttemptCount() + 1;
                    long now = System.currentTimeMillis();
                    long failedTime = lastExecutor.getTimeStamp();
                    if(now - failedTime < taskRetryIntervalInMsec){  //未到等待时间,继续留在队列
                        continue;
                    }
                    if(!lastExecutor.isShutdown()){ //上次失败的task仍未结束
                        if(now - failedTime > taskMaxWaitInMsec){
                            markCommunicationFailed(taskId);
                            reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                            throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");
                        }else{
                            lastExecutor.shutdown(); //再次尝试关闭
                            continue;
                        }
                    }else{
                        /*LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
                                this.taskGroupId, taskId, lastExecutor.getAttemptCount());*/
                    }
                }
                Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
                TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
                taskStartTimeMap.put(taskId, System.currentTimeMillis());
                taskExecutor.doStart();
                iterator.remove();
                runTasks.add(taskExecutor);
                //上面,增加task到runTasks列表,因此在monitor里注册。
                taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
                taskFailedExecutorMap.remove(taskId);
                /*LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
                        this.taskGroupId, taskId, attemptCount);*/
            }
            //4.任务列表为空,executor已结束, 搜集状态为success--->成功
            if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
                // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
                lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                        lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                /*LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);*/
                break;
            }
            // 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
            long now = System.currentTimeMillis();
            if (now - lastReportTimeStamp > reportIntervalInMillSec) {
                lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                        lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                lastReportTimeStamp = now;
                //taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查
                for(TaskExecutor taskExecutor:runTasks){
                  taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
                    if(DATX_LOG_ENABLE){
                        LOG.info("Running queue capacity is :[{}], current length is:[{}]", taskExecutor.channel.getCapacity(), taskExecutor.channel.size());
                    }
                }
            }
            Thread.sleep(sleepIntervalInMillSec);
        }
        //6.最后还要汇报一次
        reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    } catch (Throwable e) {
        Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
        if (nowTaskGroupContainerCommunication.getThrowable() == null) {
            nowTaskGroupContainerCommunication.setThrowable(e);
        }
        nowTaskGroupContainerCommunication.setState(State.FAILED);
        this.containerCommunicator.report(nowTaskGroupContainerCommunication);
        throw DataXException.asDataXException(
                FrameworkErrorCode.RUNTIME_ERROR, e);
    }finally {
        if(!PerfTrace.getInstance().isJob()){
            //最后打印cpu的平均消耗,GC的统计
            VMInfo vmInfo = VMInfo.getVmInfo();
            if (vmInfo != null) {
                vmInfo.getDelta(false);
                LOG.info(vmInfo.totalString());
            }
            LOG.info(PerfTrace.getInstance().summarizeNoException());
        }
    }
}

上述实现主要分为以下几个步骤:
  1、初始化task执行相关的状态信息,分别是taskId->Congifuration的map、待运行的任务队列taskQueue、运行失败任务taskFailedExecutorMap、运行中的任务runTasks、任务开始时间taskStartTimeMap
  2、循环检测所有任务的执行状态
    1)判断是否有失败的task,如果有则放入失败对立中,并查看当前的执行是否支持重跑和failOver,如果支持则重新放回执行队列中;如果没有失败,则标记任务执行成功,并从状态轮询map中移除
    2)如果发现有失败的任务,则汇报当前TaskGroup的状态,并抛出异常
    3)查看当前执行队列的长度,如果发现执行队列还有通道,则构建TaskExecutor加入执行队列,并从待运行移除
    4)检查执行队列和所有的任务状态,如果所有的任务都执行成功,则汇报taskGroup的状态并从循环中退出
    5)检查当前时间是否超过汇报时间检测,如果是,则汇报当前状态
    6)当所有的执行完成从while中退出之后,再次全局汇报当前的任务状态

至此,taskGroup中的所有执行完成,上述taskGroup的运行队列只是将负责对task任务进行调度,具体的执行还是TaskExecutor负责实现,下面看看TaskExecutor的执行,代码实现如下

  public TaskExecutor(Configuration taskConf, int attemptCount) {
        // 获取该taskExecutor的配置
        this.taskConfig = taskConf;
        Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
                        && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),
                "[reader|writer]的插件参数不能为空!");
        // 得到taskId
        this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
        this.attemptCount = attemptCount;
        /**
         * 由taskId得到该taskExecutor的Communication
         * 要传给readerRunner和writerRunner,同时要传给channel作统计用
         */
        this.taskCommunication = containerCommunicator
                .getCommunication(taskId);
        Validate.notNull(this.taskCommunication,
                String.format("taskId[%d]的Communication没有注册过", taskId));
        this.channel = ClassUtil.instantiate(channelClazz,
                Channel.class, configuration);
        this.channel.setCommunication(this.taskCommunication);

        /**
         * 获取transformer的参数
         */

        List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);

        /**
         * 生成writerThread
         */
        writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
        this.writerThread = new Thread(writerRunner,
                String.format("%d-%d-%d-writer",
                        jobId, taskGroupId, this.taskId));
        //通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
        this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
                PluginType.WRITER, this.taskConfig.getString(
                        CoreConstant.JOB_WRITER_NAME)));

        /**
         * 生成readerThread
         */
        readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
        this.readerThread = new Thread(readerRunner,
                String.format("%d-%d-%d-reader",
                        jobId, taskGroupId, this.taskId));
        /**
         * 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
         */
        this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.taskConfig.getString(
                        CoreConstant.JOB_READER_NAME)));
    }

    public void doStart() {
        this.writerThread.start();
        // reader没有起来,writer不可能结束
        if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    this.taskCommunication.getThrowable());
        }
        this.readerThread.start();
        // 这里reader可能很快结束
        if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
            // 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    this.taskCommunication.getThrowable());
        }
    }
    

TaskExecutor构建的时候,生成一个reader、channel和writer,并启动两个线程,reader生产数据写入channel,writer从channel中读数据,任务执行完毕时,通过wirter将任务状态置为成功

上一篇:datax常见问题


下一篇:datax源码阅读三:JobContainer