Flink运行时之合久必分的特定任务

合久必分的特定任务

前面我们谈到了TaskManager对每个Task实例会启动一个独立的线程来执行。在分析线程执行的核心代码时,我们看到最终执行的是AbstractInvokable这样执行体的invoke方法。所谓合久必分,鉴于流处理任务跟批处理任务执行模式上存在巨大的差异,在对AbstractInvokable的实现时,它们将会走向两个不同的分支。

流处理相关的任务

流处理所对应的任务的继承关系图如下:

Flink运行时之合久必分的特定任务

从上面的继承关系图可见,StreamTask是流处理任务的抽象。因为在DataStream API中允许算子链接(operator-chain),算子链中的第一个算子称之为“head”,根据“head”算子输入端个数以及角色的差别派生出三种不同类型的任务:

  • SourceStreamTask:输入源对应的流任务;
  • OneInputStreamTask:单输入端流任务;
  • TwoInputStreamTask:双输入端流任务;

在这三大类流任务之下又会有其他一些特殊的流任务被派生。但StreamTask为所有流任务的实现提供了基础。它在AbstracInvokable的invoke核心方法中规定了一套统一的执行流程,这个流程会被任何一个流处理任务在执行时所遵循。这套流程步骤如下:

  1. 创建基本的辅助部件并加载算子链;
  2. 启动算子;
  3. 特定任务相关的初始化;
  4. 打开算子;
  5. 执行run方法;
  6. 关闭算子;
  7. 销毁算子;
  8. 公共的清理操作;
  9. 特定任务相关的清理操作;

以上步骤中被标记为斜体字的步骤(3,5,9)都被定义为抽象方法让实现类填充逻辑。

除去SourceStreamTask之外,为什么要以输入端的个数来区分任务呢?这一点跟Flink的设计是分不开的,它从API层到算子再到任务的设计都以输入端的个数为中心。我们可以看一下流处理中的所有算子的继承关系图:

Flink运行时之合久必分的特定任务

可以看到除了StreamSource没有扩展OneInputStreamOperator和TwoInputStreamOperator。其他所有算子都无一例外的扩展自这两个接口。这一点跟在Task层面的继承关系是一致的。

批处理相关的任务

批处理对应的任务继承关系如下图:

Flink运行时之合久必分的特定任务

在批处理的任务设计中,将source和sink这两个角色对应的任务独立开,而其他承载业务逻辑的算子所对应的任务都由BatchTask来表示,迭代相关的任务也直接继承自BatchTask。

如同流处理中的StreamTask一样,BatchTask主要也是提供一套执行的流程并提供了基于驱动(Driver)的用户逻辑代码的抽象。

所谓Driver,它其实是衔接用户逻辑代码跟Flink运行时的任务(BatchTask)一个载体,这一点跟类似于计算机中介于软硬件之间的驱动程序。

殊途同归的UDF

其实不管Flink如何执行你的代码,对终端用户而言,它编写Flink程序时做得最多的一件事是什么?是覆写Flink提供的一些函数并在其中实现自己的业务逻辑,这些包含业务逻辑的类,我们通常就称之为UDF(user-defined function,也即用户定义的函数)。接下来,我们将关注点放在封装了UDF的执行体上,看看在流处理和批处理中它们都是如何被实现的。我们选择了一个很常见的函数Map,它在流处理中的实现为StreamMap,在批处理中的实现为MapDriver。

StreamMap在其processElement中的逻辑为:

public void processElement(StreamRecord<IN> element) throws Exception {
    output.collect(element.replace(userFunction.map(element.getValue())));
}

流处理算子的必须实现逐元素处理的processElement方法。

代码段中的userFunction是MapFunction接口的实例。

接下来再看批处理中的MapDriver中的run方法:

public void run() throws Exception {
    final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
    final MapFunction<IT, OT> function = this.taskContext.getStub();
    final Collector<OT> output = new CountingCollector<>(
        this.taskContext.getOutputCollector(), numRecordsOut);

    if (objectReuseEnabled) {
        IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();

    while (this.running && ((record = input.next(record)) != null)) {
        numRecordsIn.inc();
        output.collect(function.map(record));
    }
    }
    else {
        IT record = null;

        while (this.running && ((record = input.next()) != null)) {
            numRecordsIn.inc();
            output.collect(function.map(record));
        }
    }
}

从Collector的collect方法中可以看到同样会执行MapFunction的map方法。

两者的MapFunction是同一个接口,来自org.apache.flink.api.common.functions。

所以归根结底,不论流处理和批处理中它们的执行模式有何不同,对于相同语义的函数,UDF中用户逻辑的执行是殊途同归的。

关于更多用户逻辑的执行细节,我们后续会进行分析。


原文发布时间为:2017-01-26

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

上一篇:31 天重构学习笔记索引


下一篇:ASP.NET MVC Music Store教程(6):使用数据注释为模型进行验证