什么是State Processor
API
官方文档说明:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/libs/state_processor_api.html
目的
使用 State Processor API 可以 读取、写入和修改 savepoints 和 checkpoints ,也可以转为SQL查询来分析和处理状态数据。定位作业中的问题。
使用方式介绍
引入pom
<!--读checkpoint-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.11</artifactId>
<version>1.12.3</version>
</dependency>
读取keyed state时,使用 readKeyedState 指定uid和KeyedStateReaderFunction<KeyType, OutputType> 函数来获取对应的 state。(读哪个算子的状态就使用作业中算子的uid)
package com.d4t.dataplatform.runner;
import java.io.Serializable;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;
import com.d4t.dataplatform.runner.functions.rulerunner.RuleCalculateProcessFunction;
/**
* @author sanhongbo
* @date 2022/1/10
* @description 读取checkpoint
**/
public class ReadCheckpoint {
public static void main(String[] args) throws Exception {
final String Uid = RuleCalculateProcessFunction.class.getSimpleName();
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
final String checkpointPath = parameterTool.get("checkpoint.path");
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<KeyedMapState> keyedMapStateDataSet = Savepoint
.load(env, checkpointPath ,new MemoryStateBackend())
.readKeyedState(Uid, new ReaderFunction());
keyedMapStateDataSet
.writeAsText("hdfs:///flink/state/test");
// execute program
env.execute("read the list state");
}
static class KeyedMapState implements Serializable {
String key;
String mapKey;
Object value;
@Override
public String toString() {
return "KeyedMapState{" +
key + ',' +
mapKey + ',' +
value +
'}';
}
}
static class ReaderFunction extends KeyedStateReaderFunction<String, KeyedMapState> {
private transient MapState<String, Object> mapState;
@Override
public void open(Configuration parameters) {
/**
* 状态描述符
*/
final MapStateDescriptor<String, Object> DESCRIPTOR_MAP_STATE =
new MapStateDescriptor<>("XXXXXXX", String.class, Object.class);
mapState = getRuntimeContext().getMapState(DESCRIPTOR_MAP_STATE);
}
@Override
public void readKey(
String key,
Context ctx,
Collector<KeyedMapState> out) throws Exception {
Iterable<String> keys = mapState.keys();
for (String s : keys) {
if(s.contains("XXXXXX")){
KeyedMapState km = new KeyedMapState();
km.key = key;
km.mapKey = s;
km.value = mapState.get(s);
out.collect(km);
}
}
}
}
}
打包运行
# 并行度可与读取state的作业保持一致 否则容易内存溢出
flink run -d -p 6 -t yarn-per-job -Dtaskmanager.memory.process.size=3072mb -Dtaskmanager.memory.managed.size=0 -ynm map-state -c com.d4t.dataplatform.runner.ReadCheckpoint runner-0.1-jar-with-dependencies.jar --job.name map-state --checkpoint.path hdfs:///flink/checkpoint/rule_runner_20220108/820008c1f70ed755109219f40fc4efb9/chk-558
执行完后,将文件拉到本地。合并文件
hdfs dfs -ls /flink/state/test/ | awk '{print $NF}' | xargs -I{} hdfs dfs -copyToLocal {}
cat * >> total