Flink键控状态AggregatingState开发实例

一、键控状态说明

参考官网说明,几个键控状态介绍如下:

  • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
  • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
  • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
  • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

注意点:

  • 所有的类型都有clear(), 清空当前key的状态。
  • 这些状态对象仅用于用户与状态进行交互。
  • 从状态获取的值与输入元素的key相关(keyby动作)。
  • 状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方。

状态后端目前有三种状态:
MemoryStateBackend:内存级别,一般测试环境使用
FsStateBackend:本地状态在JobManager内存, Checkpoint存储在文件系统中,可应用于生成
RocksDBStateBackend:将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储),使用超大状态作业,对读写状态性能要求不高的作业

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。RichFunction 中 RuntimeContext 提供如下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

二、开发实例代码

基于流数据获取每个ID的平均水位

  1. 结果展示
    Flink键控状态AggregatingState开发实例
  2. 代码部分
package com.test;
import bean.WaterSensor2;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.lang.reflect.Type;
/**
 * @author: Rango
 * @create: 2021-05-07 18:53
 * @description:
 **/
public class WaterMarkAvg {
    public static void main(String[] args) throws Exception {
         //前面常规操作,建立环境建立连接装换数据 分组
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> hadoop102 = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<WaterSensor2> mapDS = hadoop102.map(new MapFunction<String, WaterSensor2>() {
            @Override
            public WaterSensor2 map(String value) throws Exception {
                String[] split = value.split(",");
                return new WaterSensor2(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
            }
        });
        KeyedStream<WaterSensor2, String> keyedStream = mapDS.keyBy(WaterSensor2::getId);
       //主要处理过程,ACC部分使用Tuple2来实现
        SingleOutputStreamOperator<WaterSensor2> streamOperator = keyedStream.process(
                new KeyedProcessFunction<String, WaterSensor2, WaterSensor2>() {
                    //<IN,OUT>
                    private AggregatingState<Double, Double> aggregatingState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        aggregatingState = getRuntimeContext()
                                .getAggregatingState(new AggregatingStateDescriptor<Double, Tuple2<Double, Integer>, Double>(
                                        "agg-state", new AggregateFunction<Double, Tuple2<Double, Integer>, Double>() {
                                    @Override
                                    public Tuple2<Double, Integer> createAccumulator() {
                                        return Tuple2.of(0.0, 0);
                                    }
                                    @Override
                                    public Tuple2<Double, Integer> add(Double value, Tuple2<Double, Integer> accumulator) {
                                        return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
                                    }
                                    @Override
                                    public Double getResult(Tuple2<Double, Integer> accumulator) {
                                        return accumulator.f0 / accumulator.f1;
                                    }
                                    @Override
                                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
                                        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                                    }
                                }, Types.TUPLE(Types.DOUBLE, Types.INT)));
                    }
                    @Override
                    public void processElement(WaterSensor2 value, Context ctx, Collector<WaterSensor2> out) throws Exception {
                        aggregatingState.add(value.getVc());
                        out.collect(new WaterSensor2(value.getId(), value.getTs(), aggregatingState.get()));
                    }});
        streamOperator.print();
        env.execute();
    }}

补充:AggregatingStateDescriptor<IN, ACC, OUT>,中间的ACC使用Tuple2作为累加器实现比较麻烦,可以使用自定义一个类来实现累加

//自定义一个bean类来作为累加器使用,使用lombok简化编写
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AvgVc {
    Double vc;
    Integer count;
}

主类实现部分可以修改如下:

public void open(Configuration parameters) throws Exception {
	aggState = getRuntimeContext().getAggregatingState(
		new AggregatingStateDescriptor<Double, AvgVc, Double>("state-agg",
			new AggregateFunction<Double, AvgVc, Double>() {
				@Override
				public AvgVc createAccumulator() {
					return new AvgVc(0.0, 0);
				}
				@Override
				public AvgVc add(Double value, AvgVc accumulator) {
					return new AvgVc(accumulator.getVc() + value,
							accumulator.getCount() + 1);
				}
				@Override
				public Double getResult(AvgVc accumulator) {
					return accumulator.getVc() / accumulator.getCount();
				}
				@Override
				public AvgVc merge(AvgVc a, AvgVc b) {
					return new AvgVc(a.getVc() + b.getVc(), a.getCount() + b.getCount());
				}
			}, AvgVc.class)
	);
}

学习交流,有任何问题还请随时评论指出交流。

上一篇:Flink的window机制


下一篇:Flink从入门到放弃之入门篇(四)-剖析窗口生命周期