flink在努力地将Python 生态和大数据生态融合,但目前的版本还不够成熟,尤其是在官方对python现有资料有限的情况下,用户想要使用python完成一个flink job并提交到flink平台上,还是有很多雷需要踩的。
以下对pyflink环节问题,python job编写到提交做了总结,可减少不必要的弯路。
一、部署环境
JDK 1.8+ & Python 3.5+ (3.7.6) & apache-flink 1.12 & pyflink 1.0
二、官方API
flink为用户提供了多层API,对于python用户,主要使用Table API 和 SQL API,个人认为Table API有一点点类似python的Dataframe,故主要使用Table API完成作业开发。用户可以参考对应版本的官方文档和示例代码学习使用。
注:这里建议一定要看官方文档,因为目前pyflink版本之间差别较大,随便搜的资料由于版本差异会造成很多不必要的麻烦。
三、环境理解
在Table API层,flink提供了3类环境和两类 planner,用户需要理解环境之间的区别和属性,以便使用正确的环境和刚好地理解一些代码参数。
简单说:TableEnviroment实现了流批一体,但不支持UDF;StreamTableEnviroment、BatchTableEnviroment分别对应流式和批处理;不过当StreamTableEnviroment设定时间窗口时,其聚合操作可看作一种特殊的批处理;
另外:仅Blink Planner支持Pandas UDAF。
四、用户自定义函数:集成 Python 生态( Python 类库)到 Flink 中的手段
UDF: 自定义标量函数。一行输入一行输出。
UDAF: 自定义聚合函数。多行输入一行输出。
UDTF: 自定义表函数。一行输入多行输出或一列输入多列输出。
五、向量化的UDF
Python 在写 Python API 的时候本质是在调用 Java API, 这个是通过Py4J作为 Java VM 和 Python VM 之间通讯的桥梁解决了两者的通讯问题,在 PythonVM 启动一个 Gateway,并且 Java VM 启动一个 Gateway Server 用于接受 Python 的请求,同时在 Python API 里面提供和 Java API 一样的对象,比如 TableENV, Table,等等。因此在两者做通讯时就会有序列化/反序列化的开销问题。
向量化Python用户自定义函数,是在执行时,通过在JVM和Python VM之间以Arrow列存格式批量传输数据,来执行的函数。 向量化Python用户自定义函数的性能通常比非向量化Python用户自定义函数要高得多,因为向量化Python用户自定义函数可以大大减少序列化/反序列化的开销和调用开销。 此外,用户可以利用流行的Python库(例如Pandas,Numpy等)来实现向量化Python用户自定义函数的逻辑。这些Python库通常经过高度优化,并提供了高性能的数据结构和功能。
【注】向量化UDF是在flink层级的,不要跟UDF里面写的方法混淆,UDF本身则是python层的,故UDF内部自然是可以完成所以python自有的功能。
六、pyflink作业模板
【实例化flink环境】->【建表source、sink】->【简单功能通过Table API对数据源表做处理】->【复杂功能通过注册、调用UDF实现】->【写出】
- 建表的所有字段必须指定数据类型、字段排序必须一致
- UDF选型,主要考虑输入输出情况
七、job 提交
对 UDF ( User-defined-Funciton)的支持上,需要添加 UDF 注册的 API , register_function,但仅仅是注册还不够,用户在自定义 Python UDF 的时候往往会依赖一些三方库,所以我们还需要增加添加依赖的方法,那就是一系列的 add 方法,比如 add_Python_file()。在编写 Python 作业的同时, Java API 也会同时被调用在提交作业之前,Java 端会构建JobGraph。然后通过 CLI 等多种方式将作业提交到集群进行运行。
udaf (矢量化标量函数)(同样使用所有标量场景)
通过以Arrow列格式在JVM和Python VM之间传输一批元素来执行的函数。
性能好、 可以利用主要的Python库( Pandas,Numpy)
Pandas UDAF不支持部分聚合。
仅Blink Planner支持Pandas UDAF。
Pandas UDAF 组或窗口的所有数据将在执行过程中同时加载到内存中,因此您必须确保组或窗口的数据适合内存。
watermark
-
是一个时间戳,标识小于这个时间戳的时间已经都到达
-
watermark水印在源位置发射,并通过拓扑中方的运算符传播
-
是StreamElement,和普通数据一起在算子之间传递
-
触发窗口的计算,那么longmax_value值会公诉算子后续没有任何数据了
生成watermark策略
-
Periodic
-
punctuated
#DDL连接属性
(1) format:可以有多种选择,如:JSON,CSV,AVRO,Canal CDC,Debezium CDC等,详细见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/
(2) can.startup.mode: 也可以有多种选择,如:earliest-offset, latest-offset, group-offsets, timestamp and specific-offsets详见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position
TIMESTAMP(3),它表示自*以来的时间戳。