《Flink 原理与实现:详解 Flink 中的状态管理》

 

使用 Keyed State

首先看一下 Keyed State 下,我们可以用哪些原子状态:

  • ValueState:即类型为 T 的单值状态。这个状态与对应的 key 绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
  • ListState:即 key 上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。
  • ReducingState:这种状态通过用户传入的 reduceFunction,每次调用add方法添加值的时候,会调用 reduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟 ReducingState 有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在 Flink 未来版本中被删除)。
  • MapState:即状态值为一个 map。用户通过putputAll方法添加元素。

以上所有的状态类型,都有一个clear方法,可以清除当前 key 对应的状态。

需要注意的是,以上所述的 State 对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄 (state handle)。

接下来看下,我们如何得到这个状态句柄。Flink 通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息。与上面的状态对应,从StateDescriptor派生了ValueStateDescriptorListStateDescriptor等 descriptor。

具体如下:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

 


 

 

(4 条消息)Flink 原理与实现:详解 Flink 中的状态管理_xorxos 的专栏 - CSDN 博客_flink 状态管理

【本文转自 Flink 原理与实现:详解 Flink 中的状态管理

Flink 原理与实现系列文章 :

Flink 原理与实现:架构和拓扑概览
Flink 原理与实现:如何生成 StreamGraph
Flink 原理与实现:如何生成 JobGraph
Flink 原理与实现:如何生成 ExecutionGraph 及物理执行图
Flink 原理与实现:Operator Chain 原理

上面 Flink 原理与实现的文章中,有引用 word count 的例子,但是都没有包含状态管理。也就是说,如果一个 task 在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上 (at least once, exactly once),Flink 引入了 state 和 checkpoint。

首先区分一下两个概念,state 一般指一个具体的 task/operator 的状态。而 checkpoint 则表示了一个 Flink Job,在一个特定时刻的一份全局状态快照,即包含了所有 task/operator 的状态。

Flink 通过定期地做 checkpoint 来实现容错和恢复。

State

Keyed State 和 Operator State

Flink 中包含两种基础的状态:Keyed State 和 Operator State。

Keyed State

顾名思义,就是基于 KeyedStream 上的状态。这个状态是跟特定的 key 绑定的,对 KeyedStream 流上的每一个 key,可能都对应一个 state。

Operator State

与 Keyed State 不同,Operator State 跟一个特定 operator 的一个并发实例绑定,整个 operator 只对应一个 state。相比较而言,在一个 operator 上,可能会有很多个 key,从而对应多个 keyed state。

举例来说,Flink 中的 Kafka Connector,就使用了 operator state。它会在每个 connector 实例中,保存该实例中消费 topic 的所有 (partition, offset) 映射。

原始状态和 Flink 托管状态 (Raw and Managed State)

Keyed State 和 Operator State,可以以两种形式存在:原始状态和托管状态。

托管状态是由 Flink 框架管理的状态,如 ValueState, ListState, MapState 等。

下面是 Flink 整个状态框架的类图,还是比较复杂的,可以先扫一眼,看到后面再回过来看:

 

《Flink 原理与实现:详解 Flink 中的状态管理》

 

通过框架提供的接口,我们来更新和管理状态的值。

而 raw state 即原始状态,由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候,使用 byte[] 来读写状态内容,对其内部数据结构一无所知。

通常在 DataStream 上的状态推荐使用托管的状态,当实现一个用户自定义的 operator 时,会使用到原始状态。

下文中所提到的状态,如果没有特殊说明,均为托管状态。

使用 Keyed State

首先看一下 Keyed State 下,我们可以用哪些原子状态:

  • ValueState:即类型为 T 的单值状态。这个状态与对应的 key 绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
  • ListState:即 key 上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。
  • ReducingState:这种状态通过用户传入的 reduceFunction,每次调用add方法添加值的时候,会调用 reduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟 ReducingState 有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在 Flink 未来版本中被删除)。
  • MapState:即状态值为一个 map。用户通过putputAll方法添加元素。

以上所有的状态类型,都有一个clear方法,可以清除当前 key 对应的状态。

需要注意的是,以上所述的 State 对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄 (state handle)。

接下来看下,我们如何得到这个状态句柄。Flink 通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息。与上面的状态对应,从StateDescriptor派生了ValueStateDescriptorListStateDescriptor等 descriptor。

具体如下:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

接下来我们看一下创建和使用 ValueState 的例子:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
    /**
     * ValueState状态句柄. 第一个值为count,第二个值为sum。
     */
    private transient ValueState<Tuple2<Long, Long>> sum;
 
    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        // 获取当前状态值
        Tuple2<Long, Long> currentSum = sum.value();
 
        // 更新
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
 
        // 更新状态值
        sum.update(currentSum);
        
        // 如果count >=2 清空状态值,重新计算
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }
 
    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // 状态名称
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 状态类型
                        Tuple2.of(0L, 0L)); // 状态默认值
        sum = getRuntimeContext().getState(descriptor);
    }
}
 
// ...
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();
 
// the printed output will be (1,4) and (1,5)

由于状态需要从RuntimeContext中创建和获取,因此如果要使用状态,必须使用 RichFunction。普通的 Function 是无状态的。

KeyedStream 上的 scala api 则提供了一些语法糖,让创建和使用状态更加方便:

val stream: DataStream[(String, Int)] = ...
 
val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

Inside Keyed State

上面以 Keyed State 为例讲了如何使用状态,接下来我们从代码层面分析一下,框架在内部做了什么事情。

先看下上面例子中open方法中获取状态句柄的代码:

    sum = getRuntimeContext().getState(descriptor);

它调用了RichFlatMapFunction.getRuntimeContext().getState方法,最终会调用StreamingRuntimeContext.getState方法:

    public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getState(stateProperties);
    }

checkPreconditionsAndGetKeyedStateStore方法中:

    KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
    return keyedStateStore;

即返回了AbstractStreamOperator.keyedStateStore变量。这个变量的初始化在AbstractStreamOperator.initState方法中:

    private void initKeyedState() {
        try {
            TypeSerializer<Object> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
            // create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer
            if (null != keySerializer) {
                KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
 
                long estimatedStateSizeInMB = config.getStateSize();
 
                this.keyedStateBackend = container.createKeyedStateBackend(
                        keySerializer,
                        // The maximum parallelism == number of key group
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        subTaskKeyGroupRange,
                        estimatedStateSizeInMB);
 
                this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
            }
 
        // ...
    }

它先调用StreamTask.createKeyedStateBackend方法创建 stateBackend,然后将 stateBackend 传入 DefaultKeyedStateStore。

StreamTask.createKeyedStateBackend方法通过它内部的 stateBackend 来创建 keyed statebackend:

    backend = stateBackend.createKeyedStateBackend(
            getEnvironment(),
            getEnvironment().getJobID(),
            operatorIdentifier,
            keySerializer,
            numberOfKeyGroups,
            keyGroupRange,
            estimatedStateSizeInMB,
            getEnvironment().getTaskKvStateRegistry());

看一下 statebackend 的初始化,在StreamTask.createStateBackend方法中,这个方法会根据配置项state.backend的值创建 backend,其中内置的 backend 有jobmanagerfilesystemrocksdb

jobmanager的 state backend 会把状态存储在 job manager 的内存中。
filesystem会把状态存在文件系统中,有可能是本地文件系统,也有可能是 HDFS、S3 等分布式文件系统。
rocksdb会把状态存在 rocksdb 中。

所以可以看到,创建了 state backend 之后,创建 keyed stated backend,实际上就是调用具体的 state backend 来创建。我们以 filesystem 为例,实际就是FsStateBackend.createKeyedStateBackend方法,这个方法也很简单,直接返回了HeapKeyedStateBackend对象。

先不展开说HeapKeyedStateBackend类,我们返回去看创建 keyed state,最终返回的是DefaultKeyedStateStore对象,它的getStategetListStategetReducingState等方法,都是对底层 keyed state backend 的一层封装,keyedStateBackend.getPartitionedState来返回具体的 state handle(DefaultKeyedStateStore.getPartitionedState方法)。

这个方法实际调用了AbstractKeyedStateBackend.getPartitionedState方法,HeapKeyedStateBackendRocksDBKeyedStateBackend都从这个基类派生。

这个类有一个成员变量:

    private final HashMap<String, InternalKvState<?>> keyValueStatesByName;

它保存了的一个映射。map value 中的 InternalKvState,实际为创建的HeapValueStateHeapListStateRocksDBValueStateRocksDBListStat等实现。

回到上面AbstractKeyedStateBackend.getPartitionedState,正常的代码路径下,它会调用AbstractKeyedStateBackend.getOrCreateKeyedState方法来创建这个 InternalKvState,其方法如下:

        S state = stateDescriptor.bind(new StateBackend() {
            @Override
            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
            }
 
            @Override
            public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
            }
        // ...

AbstractKeyedStateBackend.createValueStateAbstractKeyedStateBackend.createListState等方法是 AbstractKeyedStateBackend 的抽象方法,具体还是在 HeapKeyedStateBackend、RocksDBKeyedStateBackend 等类中实现的,所以这里创建的 state 只是一个代理,它 proxy 了具体的上层实现。在我们的例子中,最后绕了一个圈,调用的仍然是HeapKeyedStateBackend.createValueState方法,并将 state name 对应的 state handle 放入到 keyValueStatesByName 这个 map 中,保证在一个 task 中只有一个同名的 state handle。

回来看HeapKeyedStateBackend,这个类有一个成员变量:

    private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();

它的 key 为 state name, value 为 StateTable,用来存储这个 state name 下的状态值。它会将所有的状态值存储在内存中。

它的createValueState方法:

        StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
        return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);

即先注册 StateTable,然后返回一个 HeapValueState。

这里整理一下从应用层面创建一个 ValueState 的 state handle 的过程:

sum = getRuntimeContext().getState(descriptor) (app code)
  --> RichFlatMapFunction.getRuntimeContext().getState
  --> StreamingRuntimeContext.getState
    --> KeyedStateStore.getState(stateProperties)
    --> AbstractStreamOperator.keyedStateStore.getState
      --> DefaultKeyedStateStore.getState
      --> DefaultKeyedStateStore.getPartitionedState
      --> AbstractKeyedStateBackend.getPartitionedState
      --> AbstractKeyedStateBackend.getOrCreateKeyedState
        --> HeapKeyedStateBackend.createValueState
        --> HeapKeyedStateBackend.tryRegisterStateTable
        --> return new HeapValueState        

而从框架层面看,整个调用流程如下:

Task.run
  --> StreamTask.invoke
  --> StreamTask.initializeState
  --> StreamTask.initializeOperators
    --> AbstractStreamOperator.initializeState
    --> AbstractStreamOperator.initKeyedState
      --> StreamTask.createKeyedStateBackend
        --> MemoryStateBackend.createKeyedStateBackend
          --> HeapKeyedStateBackend.<init>

整体来看,创建一个 state handle 还是挺绕的,中间经过了多层封装和代理。

创建完了 state handle,接下来看看如何获取和更新状态值。

首先需要讲一下 HeapState 在内存中是如何组织的,还是以最简单的 HeapValueState 为例,
具体的数据结构,是在其基类AbstractHeapState中,以StateTable<K, N, SV> stateTable的形式存在的,其中 K 代表 Key 的类型,N 代表 state 的 namespace(这样属于不同 namespace 的 state 可以重名),SV 代表 state value 的类型。

StateTable类内部数据结构如下:

    protected final KeyGroupRange keyGroupRange;
    /** Map for holding the actual state objects. */
    private final List<Map<N, Map<K, ST>>> state;
    /** Combined meta information such as name and serializers for this state */
    protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;

最核心的数据结构是state成员变量,它保存了一个 list,其值类型为Map<N, Map<K, ST>>,即按 namespace 和 key 分组的两级 map。那么它为什么是一个 list 呢,这里就要提到keyGroupRange成员变量了,它代表了当前 state 所包含的 key 的一个范围,这个范围根据当前的 sub task id 以及最大并发进行计算,在AbstractStreamOperator.initKeyedState方法中:

                KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());

举例来说,如果当前 task 的并发是 2,最大并发是 128,那么 task-1 所属的 state backend 的 keyGroupRange 为 [0,63],而 task-2 所属的 state backend 的 keyGroupRange 为 [64,127]。

这样,task-1 中的 StateTable.state 这个 list,最大 size 即为 64。获取特定 key 的 state value 时,会先计算 key 的 hash 值,然后用 hash 值 % 最大并发,这样会得到一个 [0,127] 之间的 keyGroup,到这个 list 中 get 到这个下标的Map<N, Map<K,V>>值,然后根据 namespace + key 二级获取到真正的 state value。

看到这里,有人可能会问,对于一个 key,如何保证在 task-1 中,它计算出来的 keyGroup 一定是在 [0,63] 之间,在 task-2 中一定是在 [64,127] 之间呢?

原因是,在 KeyedStream 中,使用了KeyGroupStreamPartitioner这种 partitioner 来向下游 task 分发 keys,而这个类重载的selectChannels方法如下:

        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
        return returnArray;

这里关键是KeyGroupRangeAssignment.assignKeyToParallelOperator方法,它中间调用了KeyGroupRangeAssignment.assignToKeyGroup方法来确定一个 key 所属的 keyGroup,这个跟 state backend 计算 keyGroup 是同一个方法。然后根据这个 keyGroup,它会计算出拥有这个 keyGroup 的 task,并将这个 key 发送到此 task。所以能够保证,从 KeyedStream 上 emit 到下游 task 的数据,它的 state 所属的 keyGroup 一定是在当前 task 的 keyGroupRange 中的。

上面已经提到了获取 ValueState 的值,这里贴一下代码,结合一下就很容易理解了:

        Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
        if (namespaceMap == null) {
            return stateDesc.getDefaultValue();
        }
 
        Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
        if (keyedMap == null) {
            return stateDesc.getDefaultValue();
        }
 
        V result = keyedMap.get(backend.getCurrentKey());
        if (result == null) {
            return stateDesc.getDefaultValue();
        }
 
        return result;

而更新值则通过ValueState.update方法进行更新,这里就不贴代码了。

上面讲了最简单的 ValueState,其他类型的 state,其实也是基本一样的,只不过 stateTable 中状态值的类型不同而已。如 HeapListState,它的状态值类型为 ArrayList;HeapMapState,它的状态值类型为 HashMap。而值类型的不同,导致了在 State 上的接口也有所不同,如 ListState 会有add方法,MapState 有putget方法。在这里就不展开说了。

Checkpoint

到上面为止,都是简单的关于状态的读写,而且状态都还是只在 Task 本地,接下来就会涉及到 checkpoint。
所谓 checkpoint,就是在某一时刻,将所有 task 的状态做一个快照 (snapshot),然后存储到 memory/file system/rocksdb 等。

关于 Flink 的分布式快照,请参考 分布式 Snapshot 和 Flink Checkpointing 简介 及相关论文,这里不详述了。

Flink 的 checkpoint,是由CheckpointCoordinator来协调的,它位于 JobMaster 中。但是其实在 ExecutionGraph 中已经创建了,见ExecutionGraph.enableSnapshotCheckpointing方法。

当 Job 状态切换到 RUNNING 时,CheckpointCoordinatorDeActivator(从 JobStatusListener 派生)会触发回调coordinator.startCheckpointScheduler();,根据配置的 checkpoint interval 来定期触发 checkpoint。

每个 checkpoint 由 checkpoint ID 和 timestamp 来唯一标识,其中 checkpoint ID 可以是 standalone(基于内存)的,也可能是基于 ZK 的。
已经完成的 checkpoint,保存在 CompletedCheckpointStore 中,可以是 StandaloneCompletedCheckpointStore(保存在 JobMaster 内存中),也可以是 ZooKeeperCompletedCheckpointStore(保存在 ZK 中),甚至是自己实现的 store,比如基于 HDFS 的。

触发 checkpoint 的方法在 CheckpointCoordinator.ScheduledTrigger 中,只有一行:

    triggerCheckpoint(System.currentTimeMillis(), true);

这个方法比较长,它会先做一系列检查,如检查 coordinator 自身的状态(是否被 shutdown),还会检查与上次 checkpoint 的时间间隔、当前的并发 checkpoint 数是否超过限制,如果都没问题,再检查所有 task 的状态是否都为 RUNNING,都没问题之后,触发每个 Execution 的 checkpoint:

    for (Execution execution: executions) {
        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }

看下Execution.triggerCheckpoint方法:

    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final SimpleSlot slot = assignedResource;
 
        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }

很简单,通过 RPC 调用向 TaskManager 触发当前 JOB 的 checkpoint,然后一路调用下去:

RpcTaskManagerGateway.triggerCheckpoint
  --> TaskExecutorGateway.triggerCheckpoint
  --> TaskExecutor.triggerCheckpoint
    --> task.triggerCheckpointBarrier
    --> StatefulTask.triggerCheckpoint
    --> StreamTask.triggerCheckpoint
    --> StreamTask.performCheckpoint

具体做 checkpoint 的时候,会先向下游广播 checkpoint barrier,然后调用StreamTask.checkpointState方法做具体的 checkpoint,实际会调用到StreamTask.executeCheckpointing方法。

checkpoint 里,具体操作为,遍历每个 StreamTask 中的所有 operator:

  1. 调用 operator 的snapshotState(FSDataOutputStream out, long checkpointId, long timestamp)方法,存储 operator state,这个结果会返回 operator state handle,存储于nonPartitionedStates中。这里实际处理的时候,只有当 user function 实现了Checkpointed接口,才会做 snapshot。需要注意的是,此接口已经 deprecated,被CheckpointedFunction代替,而对CheckpointedFunction的 snapshot 会在下面的第 2 步中来做,因此这两个接口一般来说是 2 选 1 的。
  2. 调用 operator 的snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions)方法,返回OperatorSnapshotResult对象。注意虽然每个 snapshot 方法返回的都是一个 RunnableFuture,不过目前实际上还是同步做的 checkpoint(可以比较容易改成异步)。

    1. 这里会先调用AbstractStreamOperator.snapshotState方法,为 rich function 做 state snapshot
    2. 调用operatorStateBackend.snapshot方法,对 operator state 做 snapshot。
    3. 调用keyedStateBackend.snapshot方法,对 keyed state 做 snapshot。
    4. 调用timerServiceBackend.snapshot方法,对 processing time/event time window 中注册的 timer 回调做 snapshot(恢复状态的时候必须也要恢复 timer 回调)
  3. 调用StreamTask.runAsyncCheckpointingAndAcknowledge方法确认上面的 snapshot 是否都成功,如果成功,则会向 CheckpointCoordinator 发送 ack 消息。
  4. CheckpointCoordinator 收到 ack 消息后,会检查本地是否存在这个 pending 的 checkpoint,并且这个 checkpoint 是否超时,如果都 OK,则判断是否收到所有 task 的 ack 消息,如果是,则表示已经完成 checkpoint,会得到一个 CompletedCheckpoint 并加入到 completedCheckpointStore 中。

在上面的 checkpoint 过程中,如果 state backend 选择的是 jobmanager,那么最终返回的 state handle 为 ByteStreamStateHandle,这个 state handle 中包含了 snapshot 后的所有状态数据。而如果是 filesystem,则 state handle 只会包含数据的文件句柄,数据则在 filesystem 中,这个下面会再细说。

Filesystem State Backend

上面提到的都是比较简单的基于内存的 state backend,在实际生产中是不太可行的。因此一般会使用 filesystem 或者 rocksdb 的 state backend。我们先讲一下基于 filesystem 的 state backend。

基于内存的 state backend 实现为MemoryStateBackend,基于文件系统的 state backend 的实现为FsStateBackend。FsStateBackend 有一个策略,当状态的大小小于 1MB(可配置,最大 1MB)时,会把状态数据直接存储在 meta data file 中,避免出现很小的状态文件。

FsStateBackend 另外一个成员变量就是basePath,即 checkpoint 的路径。实际做 checkpoint 时,生成的路径为:<base-path>/<job-id>/chk-<checkpoint-id>/

而且 filesystem 推荐使用分布式文件系统,如 HDFS 等,这样在 fail over 时可以恢复,如果是本地的 filesystem,那恢复的时候是会有问题的。

回到 StreamTask,在做 checkpoint 的时候,是通过CheckpointStateOutputStream写状态的,FsStateBack 会使用FsCheckpointStreamFactory,然后通过FsCheckpointStateOutputStream去写具体的状态,这个实现也比较简单,就是一个带 buffer 的写文件系统操作。最后向上层返回的 StreamStateHandle,视状态的大小,如果状态特别小,则会直接返回带状态数据的ByteStreamStateHandle,否则会返回FileStateHandle,这个 state handle 包含了状态文件名和大小。

需要注意的是,虽然 checkpoint 是写入到文件系统中,但是基于 FsStateBackend 创建的 keyed state backend,仍然是HeapKeyedStateBackend,也就是说,keyed state 的读写仍然是会在内存中的,只有在做 checkpoint 的时候才会持久化到文件系统中。

RocksDB State Backend

RocksDB 跟上面的都略有不同,它会在本地文件系统中维护状态,KeyedStateBackend 等会直接写入本地 rocksdb 中。同时它需要配置一个远端的 filesystem uri(一般是 HDFS),在做 checkpoint 的时候,会把本地的数据直接复制到 filesystem 中。fail over 的时候从 filesystem 中恢复到本地。

从 RocksDBStateBackend 创建出来的 RocksDBKeyedStateBackend,更新的时候会直接以 key + namespace 作为 key,然后把具体的值更新到 rocksdb 中。

如果是 ReducingState,则在add的时候,会先从 rocksdb 中读取已有的值,然后根据用户的 reduce function 进行 reduce,再把新值写入 rocksdb。

做 checkpoint 的时候,会首先在本地对 rockdb 做 checkpoint(rocksdb 自带的 checkpoint 功能),这一步是同步的。然后将 checkpoint 异步复制到远程文件系统中。最后返回RocksDBStateHandle

RocksDB 克服了 HeapKeyedStateBackend 受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

Queryable State

Queryable State,顾名思义,就是可查询的状态,表示这个状态,在流计算的过程中就可以被查询,而不像其他流计算框架,需要存储到外部系统中才能被查询。目前可查询的 state 主要针对 partitionable state,如 keyed state 等。

简单来说,当用户在 job 中定义了 queryable state 之后,就可以在外部,通过QueryableStateClient,通过 job id, state name, key 来查询所对应的状态的实时的值。

queryable state 目前支持两种方法来定义:

  • 通过KeyedStream.asQueryableState方法,生成一个 QueryableStream,需要注意的是,这个 stream 类似于一个 sink,是不能再做 transform 的。 实现上,生成 QueryableStream 就是为当前 stream 加上一个 operator:QueryableAppendingStateOperator,它的processElement方法,每来一个元素,就会调用state.add去更新状态。因此这种方式有一个限制,只能使用 ValueDescriptor, FoldingStateDescriptor 或者 ReducingStateDescriptor,而不能是 ListStateDescriptor,因为它可能会无限增长导致 OOM。此外,由于不能在 stream 后面再做 transform,也是有一些限制。
  • 通过 managed keyed state。

      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
    new ValueStateDescriptor<>(
          "average", // the state name
          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
          Tuple2.of(0L, 0L)); 
      descriptor.setQueryable("query-name"); // queryable state name

这个只需要将具体的 state descriptor 标识为 queryable 即可,这意味着可以将一个 pipeline 中间的 operator 的 state 标识为可查询的。

首先根据 state descriptor 的配置,会在具体的 TaskManager 中创建一个 KvStateServer,用于 state 查询,它就是一个简单的 netty server,通过KvStateServerHandler来处理请求,查询 state value 并返回。

但是一个 partitionable state,可能存在于多个 TaskManager 中,因此需要有一个路由机制,当 QueryableStateClient 给定一个 query name 和 key 时,要能够知道具体去哪个 TaskManager 中查询。

为了做到这点,在 Job 的 ExecutionGraph(JobMaster)上会有一个用于定位 KvStateServer 的 KvStateLocationRegistry,当在 TaskManager 中注册了一个 queryable KvStateServer 时,就会调用JobMaster.notifyKvStateRegistered,通知 JobMaster。

具体流程如下图:

 

《Flink 原理与实现:详解 Flink 中的状态管理》

 

这个设计看起来很美好,通过向流计算实时查询状态数据,免去了传统的存储等的开销。但实际上,除了上面提到的状态类型的限制之外,也会受 netty server 以及 state backend 本身的性能限制,因此并不适用于高并发的查询。

参考资料:

  1. Dynamic Scaling: Key Groups
  2. Stateful Stream Processing
  3. Working with State
  4. Scaling to large state
  5. Queryable state design doc

 

<style></style>
上一篇:RedoLog和Checkpointnotcomplete


下一篇:RedoLogCheckpoint和SCN关系