BaseTask
package pers.aishuang.flink.streaming.task;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.io.IOException;
import java.util.Properties;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
/**
* FLink读取kafka数据写入到HDFS中
* 开发步骤:
* 1、读取本地配置文件 key-value
* 2、抽象出来获取当前流环境
* 3、抽取读取kafka中的数据流
*/
public abstract class BaseTask {
//1. 读取本地配置文件 key-value
//-- 设置参数,读取conf.properties配置文件
static ParameterTool parameterTool = null;
static {
try {
parameterTool = ParameterTool.fromPropertiesFile(
BaseTask.class.getClassLoader().getResourceAsStream("conf.properties")
);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 抽象出来获取当前流环境
* @param taskName
* @return
*/
protected static StreamExecutionEnvironment getEnv(String taskName) {
//1. 模拟当前用户root或hdfs读取hdfs集群
System.setProperty("HADOOP_USER_NAME", "root");
//2. 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//-- 设置当前任务的全局参数可见
env.getConfig().setGlobalJobParameters(parameterTool);
//-- 设置并行度
env.setParallelism(1);
//-- 设置流式数据的参照时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//3. 开启checkpoint功能,设置checkpoint
env.enableCheckpointing(30*1000L);
//-- 设置检查点模式:仅一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//-- 设置checkpoint容忍失败次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
//-- 设置checkpoint最大并行度
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//-- 设置checkpoint最短间隔时间(设置如果有多个checkpoint,两个checkpoint之间为500毫秒)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
//-- 设置checkpoint最大的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60*1000L);
//-- 设置取消任务时,保留checkpoint(默认会删除)
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
//-- 设置执行Job过程中,checkpoint失败时,job不失败
//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
//-- 设置后端保存的位置:RocksDBStateBackend 内嵌的数据库,将state保存到数据库中,异步刷写到hdfs上
try {
env.setStateBackend(new RocksDBStateBackend(
parameterTool.get("hdfsUri") + "/flink-checkpoints" + taskName,
true
));
} catch (IOException e) {
e.printStackTrace();
}
//3. 开启重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
Time.seconds(10)
));
//4. 返回设置好的执行环境
return env;
}
/**
* 读取kafka中的数据,形成数据流
* 通过流执行环境、消费者组、反序列化的方式获取DataStreamSource对象
*/
protected static <T> DataStreamSource<T> getKafkaStream(StreamExecutionEnvironment env,
String groupid,
Class<? extends DeserializationSchema> clazz){
//1. 创建消费者配置
Properties props = new Properties();
//-- 设置连接的服务端
props.setProperty(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
parameterTool.getRequired("bootstrap.servers")
);
//-- 设置消费者组ID
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupid);
//-- 设置分区自动发现
props.setProperty(
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
parameterTool.getRequired("key.partition.discovery.interval.millis")
);
//2. 新建FlinkKafkaConsumer读取kafka
FlinkKafkaConsumer<T> consumer = null;
try {
consumer = new FlinkKafkaConsumer<T>(
parameterTool.getRequired("kafka.topic"),
clazz.newInstance(),
props
);
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
//-- 设置kafka的offset提交给flink来管理
consumer.setCommitOffsetsOnCheckpoints(true);
//-- 设置分区消费策略
consumer.setStartFromEarliest();
//3. 加载数据源
DataStreamSource<T> source = env.addSource(consumer);
//4. 返回数据流
return source;
}
/**
* 将数据流写入到HDFS
*/
public static StreamingFileSink<String> getSink(
String prefix,
String suffix,
String path,
String bucketAssignerFormat
){
//1. 输出文件配置
OutputFileConfig fileConfig = OutputFileConfig.builder()
.withPartPrefix(prefix)
.withPartSuffix(suffix)
.build();
//2. 创建流文件终端
StreamingFileSink<String> fileSink = StreamingFileSink
.forRowFormat(
new Path(parameterTool.getRequired("hdfsUri") + "/apps/hive/warehouse/ods.db/" + path),
new SimpleStringEncoder<String>("utf-8")
).withBucketAssigner(
new DateTimeBucketAssigner<>(bucketAssignerFormat)
).withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(10000L)
.withInactivityInterval(3000L)
.withMaxPartSize(64 * 1024 * 1024)
.build()
).withOutputFileConfig(fileConfig)
.build();
//3. 返回流数据终端
return fileSink;
}
}