Flink 的分布式执行过程包含两个重要的角色,master 和 worker,参与 Flink 程序执行的有多个进程,包括 Job Manager,Task Manager 以及 Job Client,下图展示了 Flink 程序的执行过程。
Flink 程序首先被提交到 Job Client 上,随后 Job Client 将它提交到 Job Manager 上,Job Manager 负责安排资源的分配和 job 的执行。首先是资源的分配,然后是将 job 划分为若干 task 后提交到对应的 Task Manager 上。Task Manager 在接收到 task 后,初始化一个线程并开始执行程序。执行过程中 Task Manager 持续地将状态的变化情况报告给 Job Manager,这些状态包括开始执行(starting the execution),正在执行(in progress)以及完成(finished)。一旦 job 的执行彻底完成,Job Manager 就将其结果发回 Job Client 端。
Job Manager
Job Manager 是执行过程中的 master 进程,负责协调和管理程序的执行,主要的内容包括调度任务(task),管理检查点(checkpoints)和故障恢复(failure recovery)等等。
可以并行地存在(running)多个 master 进程以共同承担这些职责,这对整个系统的高可用性有重要意义,其中有一个 master 是所谓的 leader 节点,如果它挂了,那么后备的 master 就会选出新的 leader 节点。
Job Manager 包括以下几个重要的组成部分,并发系统(Actor system),调度器(Scheduler)和检查点机制(Check pointing),其中 Flink 在 Job Manager 和 Task Manager 之间使用 Akka actor system 交互。
Actor system
所谓的 Actor system,就是一个包含承担各种角色(role)的 actor 的容器,它提供了诸如调度,配置和日志记录等等的服务。同时,它还控制着初始化所有这些 actor 的线程池(thread pool)。所有的 actor 都在一个层次系统下管理,每一个新创建的系统都会被赋予一个父节点(parent);Actor 之间使用消息系统通信,每个 actor 保持一个自己的邮箱(mailbox)并从其中阅读信息。在本地,信息通过共享内存来传递;在远程,信息使用 RPC 传递。
父节点需要管理它的子节点,子节点出问题的时候会向父节点发送信息,如果当前节点可以解决这个问题,那么它会通过重启子节点来解决,否则就将问题上报给自己的父节点。
Flink 中的 actor 是拥有自己状态(state)和行为(behavior)的容器,actor 逐条处理它从邮箱中收到的信息,根据收到的信息改变自己的状态和行为。
Scheduler
Flink 中任务的实际执行者(executor)被抽象为任务插槽(task slot),每个 Task Manager 都会管理若干个任务插槽。Flink 内部会通过 SlotSharingGroup 和 CoLocationGroup 决定哪些 task 需要共享插槽,哪些 task 需要在特殊的插槽上执行。
Check pointing
Check pointing 机制是 Flink 提供一致地故障处理机制的基石,它保持一致的分布式的数据流和执行者的状态的快照(snapshot)。这个机制受启发于 Chandy-Lamport 算法,不过也针对 Flink 自身的情况进行了定制,具体的快照实现在论文 Lightweight Asynchronous Snapshots for Distributed Dataflows 中。
这个独特的故障容忍机制使得 Flink 能够在数据流上创建轻量级的快照,一般而言,数据流的状态可被配置保存在 HDFS 一类的地方。
一旦出现故障,Flink 终止执行者的运行,将其状态重置到最近的快照上,并重启执行过程。
Flink 的快照机制的核心元素是流栅栏(stream barrier),这些栅栏在不影响流的情况下被插入到数据流中,将记录(record)收集起来形成快照。每一个栅栏都有一个唯一的 ID,下图是这一机制的概念图。
每一个快照的状态都会汇报给 Job Manager 上的检查点协调器,拍摄快照时,Flink 会对齐记录以避免因为故障而对同一条记录处理两次。对齐过程需要消耗若干毫秒,但对于无法忍受这种延迟的某些实时应用,Flink 也提供了关闭对齐的选项,此时快照仍然会在接收到栅栏时拍摄。默认情况下 Flink 开启对齐功能,这保证的 Excatly Once 的语义,关闭该功能时,只保证 At Least Once 的语义。
Task Manager
Task Manager 作为 worker 节点在 JVM 上运行,可以同时执行若干个线程以完成分配给它的 task,task 的并行度依赖于 Task Manager 上可用的任务插槽数量,每个 task 占据了分配给它的任务插槽的资源。例如,如果一个 Task Manager 拥有四个插槽,那么它大约会为每个插槽分配 25% 的内存。每个任务插槽上运行着若干个线程,同一个插槽上的线程共享同一个 JVM,同一个 JVM 上的任务共享 TCP 连接和心跳(heart beat)信息。
Job Client
Job Client 不是 Flink 任务执行过程的内部构件,而是执行过程的起始点。Job Client 负责接收用户提交的应用程序,创建对应的数据流,然后将数据流提交到 Job Manager 上执行。一旦执行完毕,Job Client 将执行结果发回给用户。
所谓的数据流(data flow)是一个执行的计划,Client 将接收的程序转换为对应的数据流的典型过程如下图所示。
Flink 数据流默认是并行地分布式地执行,因此实际的转化结果可能更像下面的图。
Flink 的分布式分发方式有 one-to-one 和 redistribute 两种。在上图中,从 Source 到 map 的过程即为 one-to-one 的方式,它保证原来的数据划分(partitioning)和排序(ordering)不会改变;从 map 到 keyBy/window 和 从 keyBy/window 到 Sink 的过程采用 redistribute 的分发方式,这种方式可能会打乱数据原有的划分情况和排序情况,对于 keyBy 来说,就是把相同的 key 的数据分发到同一个节点上,而对于最终的 Sink,由于并行执行,它收到的数据可能不是按照原有的排序情况到达的。