Flink之state processor api读取checkpoint文件

什么是State Processor

API
官方文档说明:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/libs/state_processor_api.html

目的

使用 State Processor API 可以 读取、写入和修改 savepointscheckpoints ,也可以转为SQL查询来分析和处理状态数据。定位作业中的问题。

使用方式介绍

引入pom

		<!--读checkpoint-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-state-processor-api_2.11</artifactId>
            <version>1.12.3</version>
        </dependency>

读取keyed state时,使用 readKeyedState 指定uid和KeyedStateReaderFunction<KeyType, OutputType> 函数来获取对应的 state。(读哪个算子的状态就使用作业中算子的uid)

package com.d4t.dataplatform.runner;

import java.io.Serializable;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;

import com.d4t.dataplatform.runner.functions.rulerunner.RuleCalculateProcessFunction;

/**
 * @author sanhongbo
 * @date 2022/1/10
 * @description 读取checkpoint
 **/
public class ReadCheckpoint {
    public static void main(String[] args) throws Exception {
        final String Uid = RuleCalculateProcessFunction.class.getSimpleName();

        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        final String checkpointPath = parameterTool.get("checkpoint.path");

        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<KeyedMapState> keyedMapStateDataSet = Savepoint
                .load(env, checkpointPath ,new MemoryStateBackend())
                .readKeyedState(Uid, new ReaderFunction());

        keyedMapStateDataSet
                .writeAsText("hdfs:///flink/state/test");

        // execute program
        env.execute("read the list state");
    }

    static class KeyedMapState implements Serializable {
        String key;
        String mapKey;
        Object value;

        @Override
        public String toString() {
            return "KeyedMapState{" +
                    key + ',' +
                    mapKey + ',' +
                    value +
                    '}';
        }
    }

    static class ReaderFunction extends KeyedStateReaderFunction<String, KeyedMapState> {
        private transient MapState<String, Object> mapState;

        @Override
        public void open(Configuration parameters) {
            /**
             * 状态描述符
             */
            final MapStateDescriptor<String, Object> DESCRIPTOR_MAP_STATE =
                    new MapStateDescriptor<>("XXXXXXX", String.class, Object.class);

            mapState = getRuntimeContext().getMapState(DESCRIPTOR_MAP_STATE);
        }

        @Override
        public void readKey(
                String key,
                Context ctx,
                Collector<KeyedMapState> out) throws Exception {


            Iterable<String> keys = mapState.keys();
            for (String s : keys) {
                if(s.contains("XXXXXX")){
                    KeyedMapState km = new KeyedMapState();
                    km.key = key;
                    km.mapKey = s;
                    km.value = mapState.get(s);
                    out.collect(km);
                }
            }
        }
    }

}

打包运行

# 并行度可与读取state的作业保持一致 否则容易内存溢出
flink run -d -p 6 -t yarn-per-job -Dtaskmanager.memory.process.size=3072mb -Dtaskmanager.memory.managed.size=0 -ynm map-state -c com.d4t.dataplatform.runner.ReadCheckpoint runner-0.1-jar-with-dependencies.jar --job.name map-state --checkpoint.path hdfs:///flink/checkpoint/rule_runner_20220108/820008c1f70ed755109219f40fc4efb9/chk-558

执行完后,将文件拉到本地。合并文件

hdfs dfs -ls /flink/state/test/ | awk '{print $NF}' | xargs -I{} hdfs dfs -copyToLocal {}
cat * >> total
上一篇:关于flink中的OutputTag报错


下一篇:NC调用HTTPS接口请求实例