一、业务背景
实时统计每天考勤人数
使用MapState<Srting, Set>
key:日期字符串 -> yyyyMMdd
value:当天考勤员工ID,利用Set自动去重的特性统计当前考勤人数
状态里只需要存储当天的数据,之前的数据可以清理掉。设置状态过期时间24小时,距离数据上一次修改超过24小时,该数据会被清理。
// 设置状态过期配置
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.build();
MapStateDescriptor<String, Set> mapStateDescriptor = new MapStateDescriptor<>("MapStateDescriptor", String.class, Set.class);
// 状态过期配置与壮状态绑定
mapStateDescriptor.enableTimeToLive(ttlConfig);
attendanceUserIdState = getRuntimeContext().getMapState(mapStateDescriptor);
二、状态过期机制测试
每隔1秒发送一条数据,状态有效期设置为3秒。打印状态里的数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
TimeUnit.SECONDS.sleep(1);
sourceContext.collect(LocalDateTime.now().toString());
}
}
@Override
public void cancel() {
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> kedStream = stringDataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2 map(String s) {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = kedStream.keyBy(1);
SingleOutputStreamOperator<String> map = tuple2TupleKeyedStream.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
transient MapState<String, Object> state;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(3))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor<String, Object> mapStateDescriptor = new MapStateDescriptor<>("mapStateDescriptor", String.class, Object.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
state.put(stringIntegerTuple2.f0, new Object());
Iterator<Map.Entry<String, Object>> iterator = state.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> next = iterator.next();
String key = next.getKey();
System.out.println(key);
}
return "====================" + LocalDateTime.now();
}
});
map.print();
env.execute("StateDemo");
}
运行结果
2021-08-16T19:52:29.126
9> ====================2021-08-16T19:52:29.183
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
9> ====================2021-08-16T19:52:30.211
2021-08-16T19:52:29.126
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
9> ====================2021-08-16T19:52:31.242
2021-08-16T19:52:30.136
2021-08-16T19:52:31.141
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:32.330
2021-08-16T19:52:31.141
2021-08-16T19:52:33.148
2021-08-16T19:52:32.144
9> ====================2021-08-16T19:52:33.355