Flink MapState过期时间设置

一、业务背景

实时统计每天考勤人数

使用MapState<Srting, Set>

key:日期字符串 -> yyyyMMdd

value:当天考勤员工ID,利用Set自动去重的特性统计当前考勤人数

状态里只需要存储当天的数据,之前的数据可以清理掉。设置状态过期时间24小时,距离数据上一次修改超过24小时,该数据会被清理。

// 设置状态过期配置
StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.hours(24))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .cleanupFullSnapshot()
        .build();
MapStateDescriptor<String, Set> mapStateDescriptor = new MapStateDescriptor<>("MapStateDescriptor", String.class, Set.class);
// 状态过期配置与壮状态绑定
mapStateDescriptor.enableTimeToLive(ttlConfig);
attendanceUserIdState = getRuntimeContext().getMapState(mapStateDescriptor);

二、状态过期机制测试

每隔1秒发送一条数据,状态有效期设置为3秒。打印状态里的数据

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<String> stringDataStreamSource = env.addSource(new SourceFunction<String>() {
        @Override
        public void run(SourceContext<String> sourceContext) throws Exception {
            while (true) {
                TimeUnit.SECONDS.sleep(1);
                sourceContext.collect(LocalDateTime.now().toString());
            }
        }

        @Override
        public void cancel() {

        }
    });

    SingleOutputStreamOperator<Tuple2<String, Integer>> kedStream = stringDataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2 map(String s) {
            return Tuple2.of(s, 1);
        }
    });

    KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = kedStream.keyBy(1);

    SingleOutputStreamOperator<String> map = tuple2TupleKeyedStream.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
        transient MapState<String, Object> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
                    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();

            MapStateDescriptor<String, Object> mapStateDescriptor = new MapStateDescriptor<>("mapStateDescriptor", String.class, Object.class);
            mapStateDescriptor.enableTimeToLive(ttlConfig);
            state = getRuntimeContext().getMapState(mapStateDescriptor);
        }

        @Override
        public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
            state.put(stringIntegerTuple2.f0, new Object());
            Iterator<Map.Entry<String, Object>> iterator = state.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Object> next = iterator.next();
                String key = next.getKey();
                System.out.println(key);
            }
            return "====================" + LocalDateTime.now();
        }
    });
    map.print();

    env.execute("StateDemo");
}

运行结果

2021-08-16T19:52:29.126
9> ====================2021-08-16T19:52:29.183
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
9> ====================2021-08-16T19:52:30.211
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
9> ====================2021-08-16T19:52:31.242
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:32.330
2021-08-16T19:52:31.141
2021-08-16T19:52:33.148
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:33.355

Flink MapState过期时间设置

上一篇:lombok使用


下一篇:Pytorch使用autograd.function自定义op