Flink流处理-Task之BaseTask

BaseTask

package pers.aishuang.flink.streaming.task;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.io.IOException;
import java.util.Properties;

import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;

/**
 * FLink读取kafka数据写入到HDFS中
 * 开发步骤:
 * 1、读取本地配置文件 key-value
 * 2、抽象出来获取当前流环境
 * 3、抽取读取kafka中的数据流
 */
public abstract class BaseTask {
    //1. 读取本地配置文件 key-value
    //-- 设置参数,读取conf.properties配置文件
    static ParameterTool parameterTool = null;

    static {
        try {
            parameterTool = ParameterTool.fromPropertiesFile(
                    BaseTask.class.getClassLoader().getResourceAsStream("conf.properties")
            );
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 抽象出来获取当前流环境
     * @param taskName
     * @return
     */
    protected static StreamExecutionEnvironment getEnv(String taskName) {
        //1. 模拟当前用户root或hdfs读取hdfs集群
        System.setProperty("HADOOP_USER_NAME", "root");

        //2. 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //-- 设置当前任务的全局参数可见
        env.getConfig().setGlobalJobParameters(parameterTool);
        //-- 设置并行度
        env.setParallelism(1);
        //-- 设置流式数据的参照时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //3. 开启checkpoint功能,设置checkpoint
        env.enableCheckpointing(30*1000L);
        //-- 设置检查点模式:仅一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //-- 设置checkpoint容忍失败次数
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
        //-- 设置checkpoint最大并行度
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //-- 设置checkpoint最短间隔时间(设置如果有多个checkpoint,两个checkpoint之间为500毫秒)
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        //-- 设置checkpoint最大的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60*1000L);
        //-- 设置取消任务时,保留checkpoint(默认会删除)
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        //-- 设置执行Job过程中,checkpoint失败时,job不失败
        //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
        //-- 设置后端保存的位置:RocksDBStateBackend 内嵌的数据库,将state保存到数据库中,异步刷写到hdfs上
        try {
            env.setStateBackend(new RocksDBStateBackend(
                    parameterTool.get("hdfsUri") + "/flink-checkpoints" + taskName,
                    true
            ));
        } catch (IOException e) {
            e.printStackTrace();
        }

        //3. 开启重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3,
                Time.seconds(10)
        ));

        //4. 返回设置好的执行环境
        return env;
    }


    /**
     * 读取kafka中的数据,形成数据流
     * 通过流执行环境、消费者组、反序列化的方式获取DataStreamSource对象
     */
    protected static <T> DataStreamSource<T> getKafkaStream(StreamExecutionEnvironment env,
                                                            String groupid,
                                                            Class<? extends DeserializationSchema> clazz){


        //1. 创建消费者配置
        Properties props = new Properties();
        //-- 设置连接的服务端
        props.setProperty(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                parameterTool.getRequired("bootstrap.servers")
                );
        //-- 设置消费者组ID
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        //-- 设置分区自动发现
        props.setProperty(
                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                parameterTool.getRequired("key.partition.discovery.interval.millis")
        );

        //2. 新建FlinkKafkaConsumer读取kafka
        FlinkKafkaConsumer<T> consumer = null;
        try {
            consumer = new FlinkKafkaConsumer<T>(
                    parameterTool.getRequired("kafka.topic"),
                    clazz.newInstance(),
                    props
            );
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        //-- 设置kafka的offset提交给flink来管理
        consumer.setCommitOffsetsOnCheckpoints(true);
        //-- 设置分区消费策略
        consumer.setStartFromEarliest();

        //3. 加载数据源
        DataStreamSource<T> source = env.addSource(consumer);

        //4. 返回数据流
        return source;
    }


    /**
     * 将数据流写入到HDFS
     */
    public static StreamingFileSink<String> getSink(
            String prefix,
            String suffix,
            String path,
            String bucketAssignerFormat
    ){
        //1. 输出文件配置
        OutputFileConfig fileConfig = OutputFileConfig.builder()
                .withPartPrefix(prefix)
                .withPartSuffix(suffix)
                .build();

        //2. 创建流文件终端
        StreamingFileSink<String> fileSink = StreamingFileSink
                .forRowFormat(
                        new Path(parameterTool.getRequired("hdfsUri") + "/apps/hive/warehouse/ods.db/" + path),
                        new SimpleStringEncoder<String>("utf-8")
                ).withBucketAssigner(
                        new DateTimeBucketAssigner<>(bucketAssignerFormat)
                ).withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(10000L)
                                .withInactivityInterval(3000L)
                                .withMaxPartSize(64 * 1024 * 1024)
                                .build()
                ).withOutputFileConfig(fileConfig)
                .build();
        //3. 返回流数据终端
        return fileSink;
    }
}
上一篇:结对项目


下一篇:结对编程