一、键控状态说明
参考官网说明,几个键控状态介绍如下:
- ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
- ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
- ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
- AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
- MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
注意点:
- 所有的类型都有clear(), 清空当前key的状态。
- 这些状态对象仅用于用户与状态进行交互。
- 从状态获取的值与输入元素的key相关(keyby动作)。
- 状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方。
状态后端目前有三种状态:
MemoryStateBackend:内存级别,一般测试环境使用
FsStateBackend:本地状态在JobManager内存, Checkpoint存储在文件系统中,可应用于生成
RocksDBStateBackend:将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储),使用超大状态作业,对读写状态性能要求不高的作业
状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。RichFunction 中 RuntimeContext 提供如下方法:
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
二、开发实例代码
基于流数据获取每个ID的平均水位
- 结果展示
- 代码部分
package com.test;
import bean.WaterSensor2;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.lang.reflect.Type;
/**
* @author: Rango
* @create: 2021-05-07 18:53
* @description:
**/
public class WaterMarkAvg {
public static void main(String[] args) throws Exception {
//前面常规操作,建立环境建立连接装换数据 分组
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> hadoop102 = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<WaterSensor2> mapDS = hadoop102.map(new MapFunction<String, WaterSensor2>() {
@Override
public WaterSensor2 map(String value) throws Exception {
String[] split = value.split(",");
return new WaterSensor2(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
}
});
KeyedStream<WaterSensor2, String> keyedStream = mapDS.keyBy(WaterSensor2::getId);
//主要处理过程,ACC部分使用Tuple2来实现
SingleOutputStreamOperator<WaterSensor2> streamOperator = keyedStream.process(
new KeyedProcessFunction<String, WaterSensor2, WaterSensor2>() {
//<IN,OUT>
private AggregatingState<Double, Double> aggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
aggregatingState = getRuntimeContext()
.getAggregatingState(new AggregatingStateDescriptor<Double, Tuple2<Double, Integer>, Double>(
"agg-state", new AggregateFunction<Double, Tuple2<Double, Integer>, Double>() {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return Tuple2.of(0.0, 0);
}
@Override
public Tuple2<Double, Integer> add(Double value, Tuple2<Double, Integer> accumulator) {
return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}, Types.TUPLE(Types.DOUBLE, Types.INT)));
}
@Override
public void processElement(WaterSensor2 value, Context ctx, Collector<WaterSensor2> out) throws Exception {
aggregatingState.add(value.getVc());
out.collect(new WaterSensor2(value.getId(), value.getTs(), aggregatingState.get()));
}});
streamOperator.print();
env.execute();
}}
补充:AggregatingStateDescriptor<IN, ACC, OUT>,中间的ACC使用Tuple2作为累加器实现比较麻烦,可以使用自定义一个类来实现累加
//自定义一个bean类来作为累加器使用,使用lombok简化编写
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AvgVc {
Double vc;
Integer count;
}
主类实现部分可以修改如下:
public void open(Configuration parameters) throws Exception {
aggState = getRuntimeContext().getAggregatingState(
new AggregatingStateDescriptor<Double, AvgVc, Double>("state-agg",
new AggregateFunction<Double, AvgVc, Double>() {
@Override
public AvgVc createAccumulator() {
return new AvgVc(0.0, 0);
}
@Override
public AvgVc add(Double value, AvgVc accumulator) {
return new AvgVc(accumulator.getVc() + value,
accumulator.getCount() + 1);
}
@Override
public Double getResult(AvgVc accumulator) {
return accumulator.getVc() / accumulator.getCount();
}
@Override
public AvgVc merge(AvgVc a, AvgVc b) {
return new AvgVc(a.getVc() + b.getVc(), a.getCount() + b.getCount());
}
}, AvgVc.class)
);
}
学习交流,有任何问题还请随时评论指出交流。