state processor api

前不久,Flink社区发布了FLink 1.9版本,在其中包含了一个很重要的新特性,即
state processor api,这个框架支持对checkpoint和savepoint进行操作,包括
读取、变更、写入等等。下面我们以一个具体的例子来说明如何使用这个框架。

1.首先我们创建一个样例作业来生成savepoint
主类代码
 1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2         env.enableCheckpointing(60*1000);
 3         DataStream<Tuple2<Integer,Integer>> kafkaDataStream =
 4                 env.addSource(new SourceFunction<Tuple2<Integer,Integer>>() {
 5             private boolean running = true;
 6             private int key;
 7             private int value;
 8             private Random random = new Random();
 9             @Override
10             public void run(SourceContext<Tuple2<Integer,Integer>> sourceContext) throws Exception {
11                 while (running){
12                     key = random.nextInt(5);
13                     sourceContext.collect(new Tuple2<>(key,value++) );
14                     Thread.sleep(100);
15                 }
16             }
17 
18             @Override
19             public void cancel() {
20                 running = false;
21             }
22         }).name("source").uid("source");
23 
24 
25         kafkaDataStream
26                 .keyBy(tuple -> tuple.f0)
27                 .map(new StateTest.StateMap()).name("map").uid("map")
28                 .print().name("print").uid("print");
在上面的代码中,只需要注意在自定义的source中,发送tuple2消息,而做savepoint的
关键在于状态,状态在StateMap这个类中,如下:
 1 public static class StateMap extends RichMapFunction<Tuple2<Integer,Integer>,String>  {
 2         private transient ListState<Integer> listState;
 3 
 4         @Override
 5         public void open(Configuration parameters) throws Exception {
 6             ListStateDescriptor<Integer> lsd =
 7                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
 8             listState = getRuntimeContext().getListState(lsd);
 9         }
10 
11         @Override
12         public String map(Tuple2<Integer,Integer> value) throws Exception {
13             listState.add(value.f1);
14             return value.f0+"-"+value.f1;
15         }
16 
17         @Override
18         public void close() throws Exception {
19             listState.clear();
20         }
21     }
在上面的Map中,首先在open中声明了一个ListState,然后在消息处理的逻辑中,也很简单的只是把tuple2的值放进了
listState中。然后提交作业,等作业运行一段时间之后,触发一个savepoint,并记录savepoint的地址。至此,完成了
state processor api验证工作的数据准备。

2.利用state processor api读取savepoint
这一步只是简单验证下savepoint是否能够被正确读取,代码如下:
 1 public class ReadListState {
 2     protected static final Logger logger = LoggerFactory.getLogger(ReadListState.class);
 3 
 4     public static void main(String[] args) throws Exception {
 5         final String operatorUid = "map";
 6         final String savepointPath =
 7                 "hdfs://xxx/savepoint-41b05d-d517cafb61ba";
 8 
 9         final String checkpointPath = "hdfs://xxx/checkpoints";
10 
11         // set up the batch execution environment
12         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
13 
14         RocksDBStateBackend db = new RocksDBStateBackend(checkpointPath);
15         DataSet<String> dataSet = Savepoint
16                 .load(env, savepointPath, db)
17                 .readKeyedState(operatorUid, new ReaderFunction())
18                 .flatMap(new FlatMapFunction<KeyedListState, String>() {
19                     @Override
20                     public void flatMap(KeyedListState keyedListState, Collector<String> collector) throws Exception {
21                         keyedListState.value.forEach(new Consumer<Integer>() {
22                             @Override
23                             public void accept(Integer integer) {
24                                 collector.collect(keyedListState.key + "-" + integer);
25                             }
26                         });
27                     }
28                 });
29 
30         dataSet.writeAsText("hdfs://xxx/test/savepoint/bravo");
31 
32         // execute program
33         env.execute("read the list state");
34     }
35 
36     static class KeyedListState {
37         Integer key;
38         List<Integer> value;
39     }
40 
41     static class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedListState> {
42         private transient ListState<Integer> listState;
43 
44         @Override
45         public void open(Configuration parameters) {
46             ListStateDescriptor<Integer> lsd =
47                     new ListStateDescriptor<>("list", TypeInformation.of(Integer.class));
48             listState = getRuntimeContext().getListState(lsd);
49         }
50 
51         @Override
52         public void readKey(
53                 Integer key,
54                 Context ctx,
55                 Collector<KeyedListState> out) throws Exception {
56             List<Integer> li = new ArrayList<>();
57             listState.get().forEach(new Consumer<Integer>() {
58                 @Override
59                 public void accept(Integer integer) {
60                     li.add(integer);
61                 }
62             });
63 
64             KeyedListState kl = new KeyedListState();
65             kl.key = key;
66             kl.value = li;
67 
68             out.collect(kl);
69         }
70     }
71 }
在读取了savepoint中的状态之后,成功将其转存为一个文件,文件的部分内容如下,每行的内容分别为key-value对:

state processor api

3.利用state processor api重写savepoint
savepoint是对程序某个运行时点的状态的固化,方便程序在再次提交的时候进行接续,但有时候需要对savepoint中的状态
进行改写,以方便从特定的状态来启动作业。
 1 public class ReorganizeListState {
 2     protected static final Logger logger = LoggerFactory.getLogger(ReorganizeListState.class);
 3     public static void main(String[] args) throws Exception {
 4         final String operatorUid = "map";
 5         final String savepointPath =
 6                 "hdfs://xxx/savepoint-41b05d-d517cafb61ba";
 7 
 8         final String checkpointPath = "hdfs://xxx/checkpoints";
 9 
10         // set up the batch execution environment
11         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
12 
13         RocksDBStateBackend db = new RocksDBStateBackend(checkpointPath);
14         DataSet<KeyedListState> dataSet = Savepoint
15                 .load(env,savepointPath,db)
16                 .readKeyedState(operatorUid,new ReaderFunction())
17                 .flatMap(new FlatMapFunction<KeyedListState, KeyedListState>() {
18                     @Override
19                     public void flatMap(KeyedListState keyedListState, Collector<KeyedListState> collector) throws Exception {
20                         KeyedListState newState = new KeyedListState();
21                         newState.value = keyedListState.value.stream()
22                         .map( x -> x+10000).collect(Collectors.toList());
23                         newState.key = keyedListState.key;
24                         collector.collect(newState);
25                     }
26                 });
27         
28         BootstrapTransformation<KeyedListState> transformation = OperatorTransformation
29                 .bootstrapWith(dataSet)
30                 .keyBy(acc -> acc.key)
31                 .transform(new KeyedListStateBootstrapper());
32 
33         Savepoint.create(db,128)
34                 .withOperator(operatorUid,transformation)
35                 .write("hdfs://xxx/test/savepoint/");
36 
37         // execute program
38         env.execute("read the list state");
39     }
40 
41     static class KeyedListState{
42         Integer key;
43         List<Integer> value;
44     }
45 
46     static class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedListState> {
47         private transient ListState<Integer> listState;
48 
49         @Override
50         public void open(Configuration parameters) {
51             ListStateDescriptor<Integer> lsd =
52                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
53             listState = getRuntimeContext().getListState(lsd);
54         }
55 
56         @Override
57         public void readKey(
58                 Integer key,
59                 Context ctx,
60                 Collector<KeyedListState> out) throws Exception {
61             List<Integer> li = new ArrayList<>();
62             listState.get().forEach(new Consumer<Integer>() {
63                 @Override
64                 public void accept(Integer integer) {
65                     li.add(integer);
66                 }
67             });
68 
69             KeyedListState kl = new KeyedListState();
70             kl.key = key;
71             kl.value = li;
72 
73             out.collect(kl);
74         }
75     }
76 
77     static class KeyedListStateBootstrapper extends KeyedStateBootstrapFunction<Integer, KeyedListState> {
78         private transient ListState<Integer> listState;
79 
80         @Override
81         public void open(Configuration parameters) {
82             ListStateDescriptor<Integer> lsd =
83                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
84             listState = getRuntimeContext().getListState(lsd);
85         }
86 
87         @Override
88         public void processElement(KeyedListState value, Context ctx) throws Exception {
89             listState.addAll(value.value);
90         }
91     }
92 }
这里的关键在于根据上一步读取出来dataSet,转换的过程中将其值全部累加10000,然后将这个dataSet作为输入来构建一个BootstrapTransformation,然后创建了一个空的savepoint,
并把指定operatorUid的状态写为一个savepoint,最终写入成功,得到了一个新的savepoint,这个新的savepoint包含
的状态中的value相比原先的值发生了变化。

4.验证新生产的savepoint是否可用
由于验证用的state是ListState,换言之,是KeyedState,而KeyedState是属于Flink托管的state,意味着Flink自己
掌握状态的保存和恢复的逻辑,所以为了验证作业是否正确从新的savepoint中启动了,对之前的StateMap改写如下:
 1 public static class StateMap extends RichMapFunction<Tuple2<Integer,Integer>,String>  {
 2         private transient ListState<Integer> listState;
 3 
 4         @Override
 5         public void open(Configuration parameters) throws Exception {
 6             ListStateDescriptor<Integer> lsd =
 7                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
 8             listState = getRuntimeContext().getListState(lsd);
 9         }
10 
11         @Override
12         public String map(Tuple2<Integer,Integer> value) throws Exception {
13             listState.add(value.f1);
14             log.info("get value:{}-{}",value.f0,value.f1);
15             StringBuilder sb = new StringBuilder();
16             listState.get().forEach(new Consumer<Integer>() {
17                 @Override
18                 public void accept(Integer integer) {
19                     sb.append(integer).append(";");
20                 }
21             });
22             log.info("***********************taskNameAndSubTask:{},restored value:{}"
23                     ,getRuntimeContext().getTaskNameWithSubtasks(),sb.toString());
24             return value.f0+"-"+value.f1;
25         }
26 
27         @Override
28         public void close() throws Exception {
29             listState.clear();
30         }
31     }
由于无法在state恢复之后立刻就拿到相应恢复的数据,这里之后在每次消息达到的时候输出下state中的内容,变通的看看
是否恢复成功,结果如下:

state processor api

可以对比看下上图中key为4的输出,可以看到输出的值即为修改后的值,验证成功。
5.结语
Flink的state分为KeyedState,OperatorState和BroadcastState,在state processor api中都提供相应的处理接口。
另外,对于keyedState,如果作业的并行度发生了变化会如何?如果Key发生了变化会如何?都需要进一步探究。

官方文档参见:
https://flink.apache.org/feature/2019/09/13/state-processor-api.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html


上一篇:事务中savepoint(保存点)的使用


下一篇:计算流体力学简介(九)——拉瓦尔喷管模拟