开发者学堂课程【数据湖 JindoFS + OSS 实操干货36讲:Flink 高效 sink 写入 oss】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/833/detail/13967
Flink 高效 sink 写入 oss
内容介绍
一.背景介绍
二.功能介绍
三.如何配置
四.如何使用
一、背景介绍
Apache Flink 简介
Apache Flink 是新一代大数据计算引擎的代表,以分布式流计算为核心,同时支持批处理。
特点:
低延时: Flink 流式计算可以做到亚秒甚至毫秒级延时,相比之下Spa流计算很难达到毫秒级
高吞吐: Fink 以分布式快照算法实现容错对吞吐量的影响很小
高容错:基于分布式快照算法,Fink实现了低代价、高效的容错表现,以及 Exactly Once 语义保证。
JindoFS Flink Connector 产生背景
阿里云对象存储 Object Storage Service(oss):
海量:无限容量,弹性伸缩
安全:12个9的数据安全性,多种加密方式
低成本:远低于云磁盘,且有多种存储方式、生命周期管理等节约成本
高可靠:服务可用性99.9
已服务于海量用户
二、功能介绍
Flink 应用广泛:
流计算领域业内主要解决方案
Apache 基金会最活跃项目之一
未来:流批一体、在线分析
Flink 使用痛点:
开源 Apache Flink 尚不支持直接写入OSS
Hadoop OSS SDK 写入性能不一定满足需求
JindoFS Flink Connector 介绍
整体架构:
1.两阶段 Checkpoint (检查点)机制:
第一阶段 MPU (MultiPartUpload,分片上传)写入 OSS
第二阶段 MPU 提交
2.Recoverable Writer 可恢复性写入:
临时文件以普通文件格式上传 OSS
Sink节点状态快照
JindoFS Flink Connector 介绍
写入 OSS vS. 写入亚马逊 S3:
Native 实现:数据写入以C++代码实现,相比 java 更高效
高速读写:多线程读写临时文件,对大于1的文件优势尤其明显
数据缓存:读写 OSS 实现本地缓存,加速外部访问
OSs 访问加速, JindoFs 提供新支持
三、如何配置JindoFS Flink Connector
1.环境要求
集群上有开源版本 Flink软件,版本不低于1.10.1
2.SDK 配置:
下载所需 SDK 文件:
jindo-flink-sink-S{(version)} jar
jindofs-sdk-S{version} jar
下载链接( Github ):
@https://github.com/aliyun/alibabacloud-jin/blob/master/docs/ jindofs sdk_ download.md
将两个jar放置于集群 Flink 目录下 lib 文件夹
-Flink 根目录通常可由 SFLINKHOME 环境变量获取
-集群所有节点均需配置
Java SPI:自动加载资源,无需额外配置
文档链接(Github):
https://github.com/aliyun/alibabacloud-jing/blob/master/docs/flink/jindofs_sdk on_flink_for_oss.md
四、在程序中使用 JindoFS Flink Connector
确保集群能够访问 OSS Bucket
1. 前提:已购买 Oss 产品,Oss 网站链接 . OSS OSS :
https://www.aliyun.com/product/oss
2.确保能够访问 OSS Bucket,例如正确配置密钥或免密服务等
使用合适的路径,流式写入 OSS Bucket
写入 ss 须使用 os 前缀路径,类似于
ossuser-bucket->user--defined--sink-dir>
更多优化!用 JindoFS SDK加速OSS访问,参考:
https://github. com/aliyun/alibabacloud-jir/blob/master/docs /jindofs_sdk_vs_hadoop_sdk.md
在程序中使用 JindoFS Flink Connector:Java
在程序中开启 Flink Checkpoint
前提:使用可重发的数据源,如 Kafka
通过 StreamExecutionEnvironment 对象打开 Checkpoint (示例)
建立:
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEr
打开:
envenableCheckpointing(
示例程序
下文中, outputStream 是一个预先形成的 DataStream 对象,若需写入 ss,则可以这样添加sink:String outputPath ="oss:///":
StreamingFileSinksink= StreamingF. forRowformat
new Path(outputPath)
new SimpleStringEncoder("UTF-8)
)buildo:
outputStream.addSink(sink)
上述程序指定将 outputStream 中的 String 内容写入 ss 路径 oss //user-bucket-/user--defined--sink-dir>
最后还需用 envexecute 语句执行 Flin 作业,env 是已建立的 StreamExecutionEnviro 对象
最后,将 ava 作业打包为 jar 文件,并用 flink run在集群提交即可。
在程序中使用 JindoFS Flink Connector:更多配置
用户通过 flink run 提交 java 或 pyflink 程序时可以额外自定义一些参数,格式
flink run-m yarn-cluster-yD key1= valuel-yD key2=value2
目前支持“熵注入”及“分片上传并行度”两项配置
熵注入 (entropy injection):
1.功能:将写入路径的一段特定字符串匹配出来,用一段随机的字符串进行替换
2.效果:削弱所谓片区 (sharding) 效应,提高写入效率
3.配置参数:
oss.entropy.key=
oss.entropy length=
分片上传并行度
配置参数: oss upload.max. concurrentuploads
默认值:当前可用的处理器数量