Flink State 和 Fault Tolerance

官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/#state-fault-tolerance

所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。

flink状态的应用,比如:

  • When an application searches for certain event patterns, the state will store the sequence of events encountered so far. --复杂事件处理获取符合某一特定时间规则的事件

  • When aggregating events per minute/hour/day, the state holds the pending aggregates. --聚合计算中

  • When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. --机器学习的模型训练

  • When historic data needs to be managed, the state allows efficient access to events that occurred in the past.--使用历史的数据进行计算

当我们在使用state的时候,应该先熟悉一下flink的state backends,state backends指定了state应该怎么保存以及保存到哪里。(state可以保存到jvm 的堆内存中也可以保存到堆外内存。当然也可以借助第三方存储,例如 Flink 已经实现的对 RocksDB 支持)

使用Flink state

在 Flink 中,state分为 Keyed State 和 Operator State (or non-keyed state)两种类型。

Keyed State

基于KeyedStream使用,Keyed State 是经过分区后的流上状态,每个 Key 都有自己的状态并且只有指定的 key 才能访问和更新自己对应的状态。

Operator State

Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。每个算子子任务上的数据共享自己的状态。Kafka Connector 是一个flink中使用Operator State的很好的例子,每个Kafka consumer都包含了top partitions和offsets作为其Operator State

Operator State 的实际应用场景不如 Keyed State 多,一般来说它会被用在 Source 或 Sink 等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 Exactly-Once 语义。

Raw and Managed State

Keyed State和Operator State可以以两种形式存在:分别是Raw State和Managed State

其中flink推荐使用Managed State,详细看官网把

 

Demo

    public static void main(String[] args) throws Exception {
​
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0)
                .flatMap(new CountWindowAverage())
                .print();
​
        env.execute("submit job");
​
    }
​
    public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        /**
         * The ValueState handle. The first field is the count, the second field a running sum.
         */
        private transient ValueState<Tuple2<Long, Long>> sum;
​
        @Override
        public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
​
            Tuple2<Long, Long> currentSum;
            // access the state value
            if (sum.value() == null) {
                currentSum = Tuple2.of(0L, 0L);
            } else {
                currentSum = sum.value();
            }
            // update the count
            currentSum.f0 += 1;
            // add the second field of the input value
            currentSum.f1 += input.f1;
            // update the state
            sum.update(currentSum);
            // if the count reaches 2, emit the average and clear the state
            if (currentSum.f0 >= 2) {
                out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
                sum.clear();
            }
        }
​
        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ValueStateDescriptor<>(
                            "average", // the state name
                            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                            }));// type information
            sum = getRuntimeContext().getState(descriptor);
        }
    }

 

我们这里的输出条件为,每当第一个元素的和达到二,就把第二个元素的和与第一个元素的和相除,最后输出。我们直接运行,在控制台可以看到结果:

6> (1,4) 6> (1,5)

State Backends种类和配置

官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#state-backends

关于Flink state数据怎么保存,Flink 提供了三种State Backends用于在不同情况下使用

  • MemoryStateBackend

  • FsStateBackend

  • RocksDBStateBackend

如果我们不做任何配置的话,系统会默认使用MemoryStateBackend

MemoryStateBackend

MemoryStateBackend 将 state 数据存储在内存中,我们在使用 MemoryStateBackend 时需要注意的一些点包括:

  • 每个独立的 state 默认限制大小为 5MB,可以通过构造函数增加容量

  • 状态的大小不能超过 akka 的 Framesize 大小

  • 聚合后的状态必须能够放进 JobManager 的内存中

使用MemoryStateBackend 可以在代码中指定:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false));
//false 代表关闭异步快照机制

MemoryStateBackend 适用于:

  • 本地调试使用

  • 一些状态很小的 Job 状态信息

FsStateBackend

FsStateBackend 会把 state 数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中,少量的元数据信息存储到 JobManager 的内存中。

使用 FsStateBackend 需要我们指定一个文件路径,一般来说是 HDFS 的路径,比如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints", false));

FsStateBackend 适用于:

  • Jobs with large state, long windows, large key/value states.

  • All high-availability setups.

It is also recommended to set managed memory to zero. This will ensure that the maximum amount of memory is allocated for user code on the JVM.

RocksDBStateBackend

RocksDBStateBackend 和 FsStateBackend 有一些类似,首先它们都需要一个外部文件存储路径,比如 HDFS 路径,适用场景与FsStateBackend 也类似。

但是与 FsStateBackend 不同的是,RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 运行节点的数据目录下。

这意味着,RocksDBStateBackend 可以存储远超过 FsStateBackend 的状态,可以避免向 FsStateBackend 那样一旦出现状态暴增会导致 OOM,但是因为将状态数据保存在 RocksDB 数据库中,吞吐量会有所下降。

此外,需要注意的是,RocksDBStateBackend 是唯一支持增量快照的状态后端。

上一篇:Spark求平均值


下一篇:flink的状态后端,以及RocksDB StateBackend的配置