当Flink提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF(UDF:user defined function)、(UDAF)、(UDTF)
自定义函数类别
UDF: 自定义标量函数(User Defined Scalar Function)。一行输入一行输出。
UDAF: 自定义聚合函数。多行输入一行输出。
UDTF: 自定义表函数。一行输入多行输出或一列输入多列输出。
UDAF开发实例: 先看一下json的数据
{"productPrice":1}
{"productPrice":2}
{"productPrice":3}
{"productPrice":4}
{"productPrice":5}
客户需求: 需要根据productPrice 的数据进行累加,输入如上图数据,1、2、3、4、5 输出为 1、3、6、10、15,这个就相当于多行输入,最后只输出一行数据
(1) 编写flink udaf 代码
本文采用flink1.12.1 jdk1.8 编写
pom.xml 如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink-ray</groupId>
<artifactId>udaf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.12.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-container_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-nobootcp</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<!-- <exclude>org.slf4j:*</exclude>-->
<!-- <exclude>log4j:*</exclude>-->
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
UDAF代码编写如下:
首先介绍一下UDAF,
UDAF,自定义聚合函数,需要继承AggregateFunction<T,ACC>
抽象类,实现一系列方法。AggregateFunction
抽象类如下
abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC>
T: 表示UDAF最终输出的结果类型
ACC: 表示UDAF存放中间结果的类型
最基本的UDAF至少需要实现如下三个方法:
需要先定义一个Accumulator类,类里面定义变量,存放聚合的中间结果
createAccumulator: 。createAccumulator 方法是用来初始化你定义的Accumulator类,将内部定义的变量赋值为空或者0。
accumulate: 定义如何根据输入更新Accumulator,主要是编写中间的逻辑代码,根据输入变量来更新你的输出中间变量。
getValue: 定义如何返回Accumulator中存储的中间结果作为UDAF的最终结果。
代码如下:
编写UDAF时,一定要定义 数据的输入输出类型,通过FunctionHint来接收
package com.flink_ray;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;
//自定义UDAF
@FunctionHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT"))
public class UDAFSum extends AggregateFunction<Integer, UDAFSum.SumAccumulator> {
//定义一个Accumulator,存放聚合的中间结果
public static class SumAccumulator{
public int sumPrice;
}
/** 初始化Accumulator
* @return*/
@Override
public SumAccumulator createAccumulator() {
SumAccumulator sumAccumulator = new SumAccumulator();
sumAccumulator.sumPrice=0;
return sumAccumulator;
}
/* 定义如何根据输入更新Accumulator
* @param accumulator Accumulator
* @param productPrice 输入
*/
public void accumulate( SumAccumulator accumulator, int productPrice){
accumulator.sumPrice += productPrice;
}
// * 返回聚合的最终结果
// * @param accumulator Accumulator
// * @return
@Override
public Integer getValue(SumAccumulator accumulator) {
return accumulator.sumPrice;
}
}
编写完后,打成jar包 打包命令 mvn package -Dcheckstyle.skip, 也可以用maven 打包
打包好后,放到pyflink的 lib目录下
(2) 编写sql ,加载定义的udaf jar包
from pyflink.table.expressions import call
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
bt_env = TableEnvironment.create(settings)
# 创建source 表
bt_env.execute_sql(f"""
create table source(
productPrice int
)with(
'connector' = 'filesystem',
'path' = '/root/anaconda3/envs/rlink/flink_demo/data/add.json',
'format'='json'
)
""")
# 加载udaf jar包
bt_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///root/anaconda3/envs/rlink/lib/python3.7/site-packages/pyflink/lib/udaf-1.0-SNAPSHOT.jar")
# 注册udaf
bt_env.execute_sql(f"""
create temporary function addProductPrice AS 'com.flink_ray.UDAFSum'
""")
# use the Java function in SQL API
tab = bt_env.sql_query("SELECT addProductPrice(productPrice) FROM source")
最后运行结果如下: