flink DataStream API(二)执行模型

文章目录

执行模式(批处理/流处理)

DataStream API 支持不同的运行时执行模式,您可以根据用例的要求和作业的特征从中进行选择。这是DataStream API的“经典”执行行为,我们称之为STRIMING执行模式。这应该用于连续增量处理并希望无限期保持在线的*作业。

此外,还有一种批处理式执行模式,我们称之为 BATCH 执行模式。可以让你联想到 MapReduce 等批处理框架的方式执行作业。这将用于具有已知固定输入并且不会连续运行的有界作业。

Apache Flink对流和批处理的统一方法意味着,不管配置的执行模式如何,在有界输入上执行的DataStream应用程序都会产生相同的最终结果。重要的是要注意这里的final含义:以STREAMING模式执行的作业可能会产生增量更新(想想数据库中的upserts),而BATCH作业最终只会产生一个最终结果。如果计算正确,最终结果将是相同的,但实现方法可能不同。

通过启用BATCH,我们允许flink应用额外的优化,只有当我们知道输入的是有界流,我们才可以做这样的优化,例如可以使用不同的join/aggregation策略,在此基础上还可以拥有不同的shuffer实现和更有效的任务调度和故障恢复行为。

什么时候可以/应该使用 BATCH 执行模式?

BATCH 执行模式只能用于有界的 Jobs/Flink 程序。有界性是数据源的一个属性,它告诉我们来自该数据源的所有输入在执行之前是否已知,或者是否会有新的数据出现(可能是无限期的)。反过来,如果一个job的所有源都是有界的,那么它就是有界的,否则就是*的。

另一方面,STREAMING 执行模式即可用于有界也可以用于*作业。

根据经验,当您的程序有界时,您应该使用BATCH执行模式,因为这将更高效。当你的程序是*的时候,你必须使用STREAMING执行模式,因为只有这种模式才可以处理连续的数据流。

配置BATCH执行模式

执行模式可以通过 execution.runtime-mode 设置进行配置。存在三个可能的值:

  • STREAMING: 标准的 DataStream 执行模式(默认)
  • BATCH: DataStream API上的批处理模式
  • AUTOMATIC: 让系统根据源的有界性来决定

这可以通过 bin/flink run ... 的命令行参数进行配置,或者在创建/配置 StreamExecutionEnvironment 时以编程方式进行配置。

以下是通过命令行配置执行模式的方法:

$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

此示例显示如何在代码中配置执行模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

我们建议用户不要在他们的程序中设置运行时模式,而是在提交应用程序时使用命令行设置它。不在应用代码中配置将获得更大的灵活性,因为相同的应用程序可以在任何执行模式下执行。

Execution Behavior(执行行为)

本节概述了 BATCH 执行模式的执行行为,并将其与 STREAMING 执行模式进行了对比。详情请参考介绍此功能的 FLIP-134 and FLIP-140

任务调度和网络shuffle

Flink作业由不同的操作组成,这些操作在数据流图中连接在一起。系统决定如何在不同的进程/机器(taskmanager)上调度这些操作的执行,以及数据如何在它们之间进行shuffle

可以使用称为chaining的功能将多个操作/操作符链接在一起。Flink将一个或多个(链式)操作符组成的组视为调度单元,称为Task。通常,term subTask是指在多个taskmanager上并行运行的单个任务实例,但我们在这里只使用 term task

对于 BATCHSTREAMING 执行模式,任务调度和网络shuffle的工作方式不同。主要是因为我们知道我们的输入数据在 BATCH 执行模式下是有界的,这允许 Flink 使用更高效的数据结构和算法。

我们将通过这个例子来解释任务调度和网络传输的区别:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> source = env.fromElements(...);

source.name("source")
	.map(...).name("map1")
	.map(...).name("map2")
	.rebalance()
	.map(...).name("map3")
	.map(...).name("map4")
	.keyBy((value) -> value)
	.map(...).name("map5")
	.map(...).name("map6")
	.sinkTo(...).name("sink");

在操作之间隐含一对一连接模式的操作,如map()、flatMap()filter(),可以直接将数据转发给下一个操作,因此允许将这些操作链接在一起。这意味着Flink通常不会在它们之间进行网络shuffle

另一方面,诸如 keyBy()rebalance() 之类的操作需要在不同的并行任务实例之间打乱数据。这会导致网络shuffle

对于上面的例子,Flink会像以下这样将操作分组为任务:

  • Task1: source, map1, and map2
  • Task2: map3, map4
  • Task3: map5, map6, and sink

我们在任务 1 和任务 2 以及任务 2 和 3 之间进行了网络shuffle。以下是该Job的直观表示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XL1lSVPt-1629284073593)(https://ci.apache.org/projects/flink/flink-docs-release-1.13/fig/datastream-example-job-graph.svg)]

流执行模式

STREAMING执行模式下,所有任务都需要一直运行。这允许Flink通过整个管道立即处理新记录,我们需要连续和低延迟的流处理。这也意味着分配给作业的taskmanager需要有足够的资源来同时运行所有任务。

网络shuffle是流水线式的,这意味着记录会在网络层进行一些缓冲并立即发送到下游任务。

批执行模式

在批处理执行模式下,作业的任务可以被划分为几个阶段,这些阶段可以一个接一个地执行。我们可以这样做,因为输入是有界的,因此Flink可以在进入下一个阶段之前将当前阶段全部处理完毕。在上面的示例中,作业将有三个阶段,对应于三个任务,这三个任务由shuffle分隔的。

与上面针对流模式所述的立即将记录发送到下游任务不同,分阶段处理需要Flink将任务的中间结果持久化到存储介质中,从而允许下游任务在上游任务已经离线后读取这些结果。这会增加处理的延迟,但会带来其他有趣的特性。首先,这允许Flink在发生故障时恢复到最新的可用结果,而不是重新启动整个作业。另一个好处用是批处理作业可以在更少的资源上执行(相对于TaskManager的可用Slot而言),因为系统可以依次执行任务。

只要下游任务没有消耗它们,TaskManagers 就会保留中间结果。 之后,只要空间允许,它们将被保留,以便在发生故障时回溯到更早的结果。

状态后端/状态

STREAMING 模式下,Flink 使用 StateBackend 来控制状态的存储方式以及检查点的工作方式。

BATCH模式下,配置的状态后端将会被忽略。相反,keyed操作的输入key分组(使用排序),然后我们依次处理key对应的所有记录。这允许同时只保留一个key的状态。当移动到下一个键key时,上一个给定key的状态将被丢弃。

有关这方面的相关信息,请参阅 FLIP-140

Order of Processing(处理顺序)

在操作符或用户定义函数(udf)中处理记录的顺序在BATCHSTREAMING执行之间可能不同。

STREAMING模式下,用户定义的函数不应对传入记录的顺序进行任何假设。数据一到达就被处理。

BATCH执行模式,有一些操作可以使记录在flink中保证序的。

我们可以区分三种一般类型的输入:

  • **broadcast input(广播输入)**通过广播流输入 (请查看 Broadcast State)
  • regular input(常规输入):既不是广播也不是键控的输入
  • keyed inpu(键控输入):来自 KeyedStream 的输入

使用多个输入类型的函数或操作符将按照以下顺序处理它们:

  • 首先处理广播输入
  • 第二个处理常规输入
  • 最后处理键控输入

对于使用多个常规或广播输入的函数(如CoProcessFunction函数),Flink有权以任何顺序处理该类型输入的数据。

对于使用多个keyed输入的函数(如KeyedCoProcessFunction),Flink会处理所有keyed输入中单个key的所有记录,然后再转到下一个key控输入。

Event Time / Watermarks(时间时间 / 水印)

基于 event time的flink的流式程序,在流运行时Flink对事件产生的顺序做一个悲观的假设,即时间戳为 t 的事件可能出现在时间戳为 t+1 的事件之后。因为系统永远无法确定小于给定的时间戳 T 的元素是否会再次出现。为了抵消这种无序性对最终结果的影响,同时是系统更加实用,在STRIMING模式下flink使用一种名为Watermarks的机制。

BATCH模式中,输入数据集是预先知道的,因此不需要这样的机制,至少,元素可以按时间戳排序,以便按照时间顺序处理它们。

Processing Time(处理时间)

Processing Time是处理记录的机器上的挂钟时间。根据这个定义,我们可以看到基于Processing Time的计算结果是不重复的。这是因为处理两次的相同记录将有两个不同的时间戳。

尽管如此,在STREAMING模式下使用处理时间还是很有用的。原因在于,流管道经常实时地接收它们的*输入,因此事件时间和处理时间之间存在相关性。此外,由于上述原因,在STREAMING模式下,事件时间的1h往往可以接近处理时间或挂钟时间的1h。因此,使用处理时间可以用于提前(不完全)触发计算,从而给出预期结果的提示。

这种相关性在输入数据集是静态且事先已知的批处理模式中不存在。鉴于此,在 BATCH 模式下,允许用户请求当前处理时间并注册处理时间计时器,但是,在event-time的情况下,所有计时器都将在输入结束时触发。

故障恢复

STREAMING执行模式下,Flink使用检查点进行故障恢复。查看checkpointing documentation 文档,了解有关此操作的文档以及如何配置它。还有一个关于通过状态快照进行容错的介绍部分,这将是从更高的层次来解释这些概念。

故障恢复检查点的特征之一是,在出现故障时,Flink将从检查点重新启动所有正在运行的任务。这可能比我们在BATCH模式下所做的工作成本更高(如下所述),这也是如果您的工作允许的话,您应该使用BATCH执行模式的原因之一。

BATCH执行模式下,Flink将尝试回溯到中间结果仍然可用的前一个处理阶段。与从检查点重新启动所有任务相比,可能只有失败的任务(或图中它们的前身)必须重新启动,这可以提高作业的处理效率和总体处理时间。

重要注意事项

与传统的STREAMING式执行模式相比,在BATCH模式中,有些东西可能无法按预期工作。有些特性的工作方式略有不同,而其他特性则不受支持。

BATCH 模式下的行为变化:

  • “滚动”操作,如 reduce() 或 sum() 操作, STREAMING 模式下flink会为到达的新记录发出增量更新。在 BATCH 模式下,这些操作不是“滚动”的。他们只发出最终结果。

BATCH 模式下不支持:

  • 检查点和任何依赖于检查点的操作都不起作用。
  • Iterations

检查点

如上所述,BATCH处理程序的故障恢复不使用检查点。

重要的是要记住,因为没有检查点,某些功能(例如 CheckpointListener 以及因此 Kafka 的 EXACTLY_ONCE 模式或 StreamingFileSinkOnCheckpointRollingPolicy 将不起作用)。如果您需要在批处理模式下使用事务DataSink,请确保使用FLIP-143中建议的统一DataSink API。

上一篇:安装CentOS版本的yum


下一篇:flink core 流处理,批处理