Flink-状态管理
在 Flink 的框架中,进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。
状态
我们在 Flink 的官方博客中找到这样一段话,可以认为这是对状态的定义:
When working with state, it might also be useful to read about Flink’s state backends. Flink provides different state backends that specify how and where state is stored. State can be located on Java’s heap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic.
这段话告诉我们,所谓的状态指的是,在流处理过程中那些需要记住的数据,而这些数据既可以包括业务数据,也可以包括元数据。Flink 本身提供了不同的状态管理器来管理状态,并且这个状态可以非常大。
Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储,例如 Flink 已经实现的对 RocksDB 支持。Flink 的官网同样给出了适用于状态计算的几种情况:
When an application searches for certain event patterns, the state will store the sequence of events encountered so far
When aggregating events per minute/hour/day, the state holds the pending aggregates
When training a machine learning model over a stream of data points, the state holds the current version of the model parameters
When historic data needs to be managed, the state allows efficient access to events that occurred in the past
以上四种情况分别是:复杂事件处理获取符合某一特定时间规则的事件、聚合计算、机器学习的模型训练、使用历史的数据进行计算。
Flink 状态分类和使用
我们在之前的课时中提到过 KeyedStream 的概念,并且介绍过 KeyBy 这个算子的使用。在 Flink 中,根据数据集是否按照某一个 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-Keyed State)两种类型
如上图所示,Keyed State 是经过分区后的流上状态,每个 Key 都有自己的状态,图中的八边形、圆形和三角形分别管理各自的状态,并且只有指定的 key 才能访问和更新自己对应的状态。
与 Keyed State 不同的是,Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。每个算子子任务上的数据共享自己的状态。
但是有一点需要说明的是,无论是 Keyed State 还是 Operator State,Flink 的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。
存储State的数据结构/API介绍
Flink为了方便不同分类的State的存储和管理,提供了如下的API/数据结构来存储State!
Keyed State 通过 RuntimeContext 访问,这需要 Operator 是一个RichFunction。
保存Keyed state的数据结构:
-
ValueState
:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值,如求按用户id统计用户交易总额 -
ListState
:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable 来遍历状态值,如统计按用户id统计用户经常登录的Ip -
ReducingState
:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值 -
MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素
需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄
Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口。
保存Operator state的数据结构:
-
ListState
-
BroadcastState<K,V>
State代码示例
Keyed State
/**
* @author WGR
* @create 2021/9/18 -- 10:32
*/
public class StateTest1_OperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("192.168.1.180", 9998);
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个有状态的map操作,统计当前分区数据个数
SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());
resultStream.print();
env.execute();
}
// 自定义MapFunction
public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
private Integer count = 0;
@Override
public Integer map(SensorReading value) throws Exception {
count ++ ;
return count;
}
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
for( Integer num: state )
count += num;
}
}
}
Operator State
/**
* @author WGR
* @create 2021/9/18 -- 10:56
*/
public class StateTest2_KeyedState {
@SneakyThrows
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("192.168.1.180", 9998);
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个有状态的map操作,统计当前分区数据个数
SingleOutputStreamOperator<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());
resultStream.print();
env.execute();
}
// 自定义RichMapFunction
public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {
private ValueState<Integer> keyCountState;
@Override
public void open(Configuration parameters) throws Exception {
keyCountState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("key-count", Integer.class, 0));
}
@Override
public Integer map(SensorReading value) throws Exception {
Integer count = keyCountState.value();
count++;
keyCountState.update(count);
return count;
}
}
}
Watermark的并行度测试
/**
* @author WGR
* @create 2021/9/14 -- 11:04
*/
public class WindowTest4_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("192.168.1.180", 9998);
// 转换成SensorReading类型,分配时间戳和watermark
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
SensorReading sensorReading = new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
System.out.println(sensorReading.toString());
return sensorReading;
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(outputTag).print("late");
env.execute();
}
}
SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
SensorReading{id='sensor_1', timestamp=1547718214, temperature=37.1}
SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
SensorReading{id='sensor_1', timestamp=1547718213, temperature=37.1}
SensorReading{id='sensor_1', timestamp=1547718215, temperature=37.3}
minTemp:2> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
minTemp:3> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
minTemp:4> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
minTemp:3> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.3}
minTemp:4> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
minTemp:3> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
minTemp:2> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
minTemp:3> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}