概述
本人主要分享日常工作过程中值得关切的 flink代码编写技巧。
注意明细项
Doris建表
以下字段必备
created_time datetime
modified_time datetime
is_deleted smallint(6) NULL COMMENT "是否删除,1是0否",doris_delete
tinyint(4) NULL DEFAULT "0" COMMENT "删除标记" 此字段是业务逻辑删除字段,当发现数据被物理删除时,使用此字段标记为1
alter table uc_student_bak enable feature "BATCH_DELETE"; 此处很关键标记表可以批量删除,这样当 merge标记为 delete时, doris_delete =1 的数据会被删除掉
自定义SINK
实现 RichSinkFunction 接口 , 各种不同的 sink 存储支持 最好都单独搞一个class 继承 RichSinkFunction 接口
附属贴上 sink doris代码
public class DorisSinkFunction extends RichSinkFunction<String> {
private static final Logger log = LoggerFactory.getLogger(DorisSinkFunction.class);
//累加器对象
private final LongCounter counter = new LongCounter();
private HttpClientBuilder builder;
private String loadUrl;
private String authorization;
private String username;
private String password;
private String profile;
private String mergeType;
private String dbName;
private String tableName;
public static DorisSinkFunction of(String profile, String mergeType, String dbName, String tableName) {
return new DorisSinkFunction(profile, mergeType, dbName, tableName);
}
private DorisSinkFunction(String profile, String mergeType, String dbName, String tableName) {
this.profile = profile;
this.mergeType = mergeType;
this.dbName = dbName;
this.tableName = tableName;
}
private String basicAuthHeader() {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
private HttpClientBuilder httpClientBuilder() {
return HttpClients.custom()
//添加重试策略
.setRetryHandler(new HttpRequestRetryHandler() {
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
//异常超过3次停止重试
if (executionCount > 3) {
return false;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
})
//支持重定向
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
}
@Override
public void open(Configuration parameters) throws Exception {
//获取配置
final Properties props = PropertyUtils.getDorisProps(profile);
this.username = props.getProperty("username");
this.password = props.getProperty("password");
//初始化authorization
this.authorization = basicAuthHeader();
//初始化http client builder
this.builder = httpClientBuilder();
//构建doris stream load url
this.loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
props.getProperty("feHost"),
props.getProperty("httpPort"),
dbName, tableName);
//注册累加器
getRuntimeContext().addAccumulator("counter", counter);
}
@Override
public void close() throws Exception {
log.warn(" write doris http client execute close, dt: {}", DateTimeUtils.getCurrentDt());
}
@Override
public void invoke(String value, Context context) throws Exception {
try (CloseableHttpClient client = this.builder.build()) {
//创建put对象
final HttpPut put = new HttpPut(loadUrl);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, this.authorization);
put.setHeader("strip_outer_array", "true");//批量插入传json数组必须要有此配置
put.setHeader("format", "json");
put.setHeader("label", UUID.randomUUID().toString().replace("-", ""));
//添加doris batch delete header
if ("MERGE".equalsIgnoreCase(mergeType)) {
put.setHeader("merge_type", "MERGE");
put.setHeader("delete", "doris_delete=1");
}
put.setEntity(new StringEntity(value, "UTF-8"));
//execute
try (final CloseableHttpResponse response = client.execute(put)) {
String loadResult = null;
String loadStatus = null;
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
loadStatus = JSONObject.parseObject(loadResult).getString("Status");
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || !"Success".equalsIgnoreCase(loadStatus)) {
String msg = String.format(" stream_load_failed, statusCode=%s loadResult=%s", statusCode, loadResult);
log.error(msg);
throw new RuntimeException(msg);
}
} catch (Exception e) {
log.error(" stream_load_to_doris execute error: {}", e);
throw new RuntimeException(" stream_load_to_doris execute error. ", e);
}
} catch (Exception e) {
log.error(" stream_load_to_doris invoke error: {}", e);
throw new RuntimeException(" stream_load_to_doris invoke error. ", e);
}
//累加器自加
counter.add(1);
}
}
自定义各种函数
- 简单的 map转换 实现 MapFunction 或者 FlatMapFunction 接口 , 建议只要有业务含义,那怕这是就基本的字符串转换也要搞成单独的 class
单独的class 中的构造函数 可以传入需要的参数 。 最常用场景 JSONObject 中设置必要的值
MapFunction 主要方法如下 O map(T value) throws Exception; 可以发现该方法只支持最基本的一个元素到一个元素的转换
FlatMapFunction 主要方法如下 void flatMap(T value, Collectorout) throws Exception; 可以发现该方法支持一个参数 转换为 多个参数的转换。
ProcessWindowFunction 应用于 keyed 的窗口,核心方法 process(KEY key, Context context, Iterableelements, Collector out) , 通过该方法就能知道他是基于 KEYBy之后的窗口函数。
ProcessAllWindowFunction 应用于 non-keyed 的窗口函数 process(Context context, Iterableelements, Collector out) throws Exception 通过该方法知道是基于非KEYBY 的全局窗口。
流式任务配置化
专注于配置化解决任务运行问题
可以每个实时任务 一个 properties文件记录关键元数据,每个任务基于这些元数据进行任务运行;常用的properties格式如下:
job-config.properties 一般和运行脚本在一个目录
小节: 以下从指定的CK启动,则不会管 kafka source 的 StartFrom设置的是啥,只会从CK state中存储的 offset开始消费;
要设置env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);不然cancel后cp就清除了
# 运行环境 prod 生产 test 测试
profile=prod
# 任务名称
jobName=dwd-rt-xx
# 消费组
groupId=flink_dwd_rt_xx
# 消费topic
sourceTopic=topic1
# sink topic
sinkTopic=topic2
# 待存储的表, 可能是 TIDB CK DORIS HBASE ORACLE MYSQL 等各种数据存储类型
sinkTable=table1
# 待存储的topic
sinkTopic= topic1
# sink 支持的数据库类型
sink=doris,tidb
# 以下主要定义维表 join是 比如根据 b1 从维度表中获取对应的字段值,然后执行 然后执行 map merge 就是维表join的结果,最终map中key变更为 b2
xxx_mapping=a, b1->b2
以上任务配置完之后,需要运行flink 任务在没有实时计算平台的情况下, 最简单的办法就是搞个提交堡垒机 ,在该堡垒机上需要设置如下信息:
- 设置专门提交flink 任务的 租户
- 指定目录存储各种脚本
- 编写脚本运行
脚本模板参考下面
run-job.sh
cp 代表从指定的 checkpoint开始启动
sp 代表从指定的 savepoint 开始启动
默认 start 适合第一次启动
以下脚本运行时可以把公共变量抽象到单独的脚本中去,比如 common_env.sh
然后在运行脚本中 source ../../common_env.sh 引入这个公共脚本。
#!/bin/bash
# flink 启动客户端地址
flink_bin=/xxx/flink-1.2.1-yarn/flink-1.12.1/bin/flink
# ck 地址
cp_root_path=hdfs://xxxcluster/flink_realedw/cluster/checkpoints
# 提交yarn任务实时队列
queue=xxxqueue
# 任务名称
app_name="ods-gk-xxx"
# 实时任务 main-class
class_ref=cn.xxx.xxx.OdsGroupSummary
# 当前实时任务jar包地址
jar_dir=../xxx-1.0-SNAPSHOT-shaded.jar
# 当前作业依赖配置文件相对路径,真实任务中会解析该配置文件
config_path=job-config.properties
# 实时任务默认并行度
parallelism=1
# grep JobID log.log | cut -d" " -f7 从flink启动日志中解析出 JobID
# grep 'yarn application -kill' log.log |cut -d" " -f5 从启动日志中解析出 yarn-job-ID
# --config-path $config_path 此参数对应业务配置文件的地址
case $1 in
"cancel"){
echo "================ cancel flilk job ================"
yarn_app_id=`grep 'yarn application -kill' log.log |cut -d" " -f5`
job_id=`grep JobID log.log | cut -d" " -f7`
echo "LastAppId: $yarn_app_id LastJobID: $job_id"
$flink_bin cancel -m yarn-cluster -yid $yarn_app_id $job_id
};;
"cp"){
echo "================ start from checkpoint ================"
job_id=`grep JobID log.log | cut -d" " -f7`
cp_path=`hadoop fs -ls ${cp_root_path}/${job_id}/ | grep chk- | awk -F" " '{print$8}' |sort -nr |head -1`
echo "LastJobID: $job_id CheckpointPath: $cp_path"
nohup $flink_bin run -d -t yarn-per-job \
-Dyarn.application.queue=$queue \
-Dyarn.application.name=$app_name \
-p $parallelism \
-c $class_ref \
-s $cp_path \
$jar_dir \
--config-path $config_path >./log.log 2>&1 &
};;
"sp"){
echo "================ start from savepoint ================"
echo "SavepointPath: $2"
nohup $flink_bin run -d -t yarn-per-job \
-Dyarn.application.queue=$queue \
-Dyarn.application.name=$app_name \
-p $parallelism \
-c $class_ref \
-s $2 \
$jar_dir \
--config-path $config_path >./log.log 2>&1 &
};;
*){
echo "================ start ================"
nohup $flink_bin run -d -t yarn-per-job \
-Dyarn.application.queue=$queue \
-Dyarn.application.name=$app_name \
-p $parallelism \
-c $class_ref \
$jar_dir \
--config-path $config_path >./log.log 2>&1 &
};;
esac
# 将shell脚本进程ID写入 pid.pid 文件
echo $! > ./pid.pid
hive on hbase
对于HBASE的操作,可以使用建立外表基于 HBASE创建,通过对外表 hive的操作达到操作 HBASE的目的。
## hbase DDL
create 'rtdw:dwd_rt_dim_ecproductdb_product_xxx', 'cf'
## hive on hbase DDL
CREATE EXTERNAL TABLE dwd.dwd_rt_dim_ecproductdb_product_xxx_hb (
key String,
id String,
product_id String
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,cf:id,cf:product_id")
TBLPROPERTIES ("hbase.table.name" = "rtdw:dwd_rt_dim_ecproductdb_product_xxx");
## hive sql
insert overwrite table dwd.dwd_rt_dim_ecproductdb_product_course_hb values('111','222','xxx1'),('222','333','xxx2')
## 以上通过hive插入之后,会发现最近效果显现在HBASE中,HBASE也有结果了
一些不错的工具类
TypeUtils.castToString TypeUtils 类中包含了很多强制将 Object 类型 转换为其他类型的方法。
拼装SQL中的各种类型信息可以直接参考 java.sql.Types 类下面定义了所有的类型信息。