datax源码阅读三:JobContainer

前面介绍的python文件和Engine都是用于做初始化准备,真正的执行都是在这里完成,start代码如下:

/**
 * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、
 * post以及destroy和statistics
 */
@Override
public void start() {
    LOG.info("DataX jobContainer starts job.");

    boolean hasException = false;
    boolean isDryRun = false;
    try {
        this.startTimeStamp = System.currentTimeMillis();
        isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
        if(isDryRun) {
            LOG.info("jobContainer starts to do preCheck ...");
            this.preCheck();
        } else {
            userConf = configuration.clone();
            LOG.debug("jobContainer starts to do preHandle ...");
            this.preHandle();

            LOG.debug("jobContainer starts to do init ...");
            this.init();
            LOG.info("jobContainer starts to do prepare ...");
            this.prepare();
            LOG.info("jobContainer starts to do split ...");
            this.totalStage = this.split();
            LOG.info("jobContainer starts to do schedule ...");
            this.schedule();
            LOG.debug("jobContainer starts to do post ...");
            this.post();

            LOG.debug("jobContainer starts to do postHandle ...");
            this.postHandle();
            LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

            this.invokeHooks();
        }
    } catch (Throwable e) {
        LOG.error("Exception when job run", e);

        hasException = true;

        if (e instanceof OutOfMemoryError) {
            this.destroy();
            System.gc();
        }
        if (super.getContainerCommunicator() == null) {
            // 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化

            AbstractContainerCommunicator tempContainerCollector;
            // standalone
            tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);

            super.setContainerCommunicator(tempContainerCollector);
        }

        Communication communication = super.getContainerCommunicator().collect();
        // 汇报前的状态,不需要手动进行设置
        // communication.setState(State.FAILED);
        communication.setThrowable(e);
        communication.setTimestamp(this.endTimeStamp);

        Communication tempComm = new Communication();
        tempComm.setTimestamp(this.startTransferTimeStamp);

        Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
        super.getContainerCommunicator().report(reportCommunication);

        throw DataXException.asDataXException(
                FrameworkErrorCode.RUNTIME_ERROR, e);
    } finally {
        if(!isDryRun) {
            this.destroy();
            this.endTimeStamp = System.currentTimeMillis();
            if (!hasException) {
                //最后打印cpu的平均消耗,GC的统计
                VMInfo vmInfo = VMInfo.getVmInfo();
                if (vmInfo != null) {
                    vmInfo.getDelta(false);
                    LOG.info(vmInfo.totalString());
                    LogUtil.logVmInfo(vmInfo);
                }

                LOG.info(PerfTrace.getInstance().summarizeNoException());
                this.logStatistics();
            }
        }
    }
}

主要执行流程为:

1、preHandle():job前置操作
2、init():初始化reader和writer
3、prepare():执行插件的prepare操作
4、split():切分任务
5、schedule():执行任务
6、post():执行插件的post操作
7、postHandle():job后置操作
8、invokeHooks():调用hook
9、输出统计结果

上述任务流是顺序执行的,第5步会将切分的task分配到多个taskGroup中并发执行,其中preHandle、postHandle、invokeHooks不影响整体执行,可以先忽略,下面主要介绍关键步骤
1)init()关键代码主要是初始化reader和writer插件

/**
 * reader和writer的初始化
 */
private void init() {
    this.jobReader = this.initJobReader(jobPluginCollector);
    this.jobWriter = this.initJobWriter(jobPluginCollector);
}
/**
 * reader job的初始化,返回Reader.Job
 *
 * @return
 */
private Reader.Job initJobReader(
        JobPluginCollector jobPluginCollector) {
    this.readerPluginName = this.configuration.getString(
            CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
            PluginType.READER, this.readerPluginName));

    Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
            PluginType.READER, this.readerPluginName);

    // 设置reader的jobConfig
    jobReader.setPluginJobConf(this.configuration.getConfiguration(
            CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

    // 设置reader的readerConfig
    jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
            CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

    jobReader.setJobPluginCollector(jobPluginCollector);
    jobReader.init();

    classLoaderSwapper.restoreCurrentThreadClassLoader();
    return jobReader;
}

reader插件初始化的时候使用了自定义classLoader,这样可以做到插件级别的隔离,插件初始化完成之后调用了job的init函数,用于初始化插件的Job,writer插件初始化同理
2)prepare():prepare操作比较简单,分别执行reader和writer插件Job中的prepare函数即可,同样,每次执行前都会先加载对应的classLoader用于隔离

private void prepare() {
    this.prepareJobReader();
    this.prepareJobWriter();
}

3)split():切分任务task

/**
 * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,
 * 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起,
 * 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉
 */
private int split() {
    this.adjustChannelNumber();
    if (this.needChannelNumber <= 0) {
        this.needChannelNumber = 1;
    }
    List<Configuration> readerTaskConfigs = this
            .doReaderSplit(this.needChannelNumber);
    int taskNumber = readerTaskConfigs.size();
    List<Configuration> writerTaskConfigs = this
            .doWriterSplit(taskNumber);
    List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
    LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
    /**
     * 输入是reader和writer的parameter list,输出是content下面元素的list
     */
    List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
            readerTaskConfigs, writerTaskConfigs, transformerList);
    LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
    this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
    return contentConfig.size();
}

上面函数主要分为3步:

1、计算限速和并发,即实际的channel数和每个channel的限速,主要在adjustChannelNumber()中,这里不做过多说明
2、根据实际的channel数,切分reader端,具体的切分逻辑reader插件可以自行实现
3、根据reader端切分的数目切分writer端,达到reader:writer=1:1,这样每个task中都包含一个reader和一个writer

4)schedule():执行切分出来的task

/**
 * schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,
 * 同时不同的执行模式调用不同的调度策略,将所有任务调度起来
 */
private void schedule() {
    /**
     * 这里的全局speed和每个channel的速度设置为B/s
     */
    int channelsPerTaskGroup = this.configuration.getInt(
            CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
    int taskNumber = this.configuration.getList(
            CoreConstant.DATAX_JOB_CONTENT).size();
    this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
    PerfTrace.getInstance().setChannelNumber(needChannelNumber);
    /**
     * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
     */
    List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
            this.needChannelNumber, channelsPerTaskGroup);
    LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
    ExecuteMode executeMode = null;
    AbstractScheduler scheduler;
    try {
        executeMode = ExecuteMode.STANDALONE;
        scheduler = initStandaloneScheduler(this.configuration);
        //设置 executeMode
        for (Configuration taskGroupConfig : taskGroupConfigs) {
            taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
        }
        if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
            if (this.jobId <= 0) {
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
                        "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
            }
        }
        LOG.info("Running by {} Mode.", executeMode);
        this.startTransferTimeStamp = System.currentTimeMillis();
        scheduler.schedule(taskGroupConfigs);
        this.endTransferTimeStamp = System.currentTimeMillis();
    } catch (Exception e) {
        LOG.error("运行scheduler 模式[{}]出错.", executeMode);
        this.endTransferTimeStamp = System.currentTimeMillis();
        throw DataXException.asDataXException(
                FrameworkErrorCode.RUNTIME_ERROR, e);
    }
    /**
     * 检查任务执行情况
     */
    this.checkLimit();
}

schedule执行过程主要分为以下几步:

1、计算taskGroup个数
2、将切分的task分配到taskGroup中
3、启动线程池执行taskGroup,具体代码流程为scheduler.schedule(taskGroupConfigs) -> AbstractScheduler.schedule -> startAllTaskGroup -> ProcessInnerScheduler.startAllTaskGroup -> this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner) -> TaskGroupContainerRunner.run() -> this.taskGroupContainer.start()
4、收集taskGroup汇报的信息

5)post():执行插件的post操作

private void post() {
    this.postJobWriter();
    this.postJobReader();
}
上一篇:datax源码阅读四:TaskGroupContainer


下一篇:阿里云推出国内首个社保云系统