pyflink sql udaf 使用流程

当Flink提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF(UDF:user defined function)、(UDAF)、(UDTF)

自定义函数类别

UDF: 自定义标量函数(User Defined Scalar Function)。一行输入一行输出。

UDAF: 自定义聚合函数。多行输入一行输出。

UDTF: 自定义表函数。一行输入多行输出或一列输入多列输出。

UDAF开发实例: 先看一下json的数据

{"productPrice":1}
{"productPrice":2}
{"productPrice":3}
{"productPrice":4}
{"productPrice":5}


客户需求: 需要根据productPrice 的数据进行累加,输入如上图数据,1、2、3、4、5 输出为 1、3、6、10、15,这个就相当于多行输入,最后只输出一行数据

(1) 编写flink udaf 代码 

 本文采用flink1.12.1 jdk1.8 编写

pom.xml 如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.flink-ray</groupId>
    <artifactId>udaf</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.version>1.12.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-container_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-python_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-nobootcp</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <!--                                    <exclude>org.slf4j:*</exclude>-->
                                    <!--                                    <exclude>log4j:*</exclude>-->
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

UDAF代码编写如下:

首先介绍一下UDAF,

UDAF,自定义聚合函数,需要继承AggregateFunction<T,ACC>抽象类,实现一系列方法。AggregateFunction抽象类如下

abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC>
T: 表示UDAF最终输出的结果类型
ACC: 表示UDAF存放中间结果的类型

最基本的UDAF至少需要实现如下三个方法:

需要先定义一个Accumulator类,类里面定义变量,存放聚合的中间结果

     createAccumulator: 。createAccumulator 方法是用来初始化你定义的Accumulator类,将内部定义的变量赋值为空或者0。

     accumulate: 定义如何根据输入更新Accumulator,主要是编写中间的逻辑代码,根据输入变量来更新你的输出中间变量。

     getValue: 定义如何返回Accumulator中存储的中间结果作为UDAF的最终结果。
 

代码如下:

编写UDAF时,一定要定义 数据的输入输出类型,通过FunctionHint来接收

package com.flink_ray;




import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;

//自定义UDAF

@FunctionHint(input = @DataTypeHint("INT"), output = @DataTypeHint("INT"))
public  class UDAFSum extends AggregateFunction<Integer, UDAFSum.SumAccumulator> {

    //定义一个Accumulator,存放聚合的中间结果

    public static class SumAccumulator{
        public int sumPrice;
    }

    /** 初始化Accumulator
     * @return*/
    @Override
    public SumAccumulator createAccumulator() {
        SumAccumulator sumAccumulator = new SumAccumulator();
        sumAccumulator.sumPrice=0;
        return sumAccumulator;
    }

    /* 定义如何根据输入更新Accumulator
     * @param accumulator  Accumulator
     * @param productPrice 输入
     */

    public void accumulate( SumAccumulator accumulator, int productPrice){
        accumulator.sumPrice += productPrice;
    }


        // * 返回聚合的最终结果
        //     * @param accumulator Accumulator
        //     * @return

    @Override
    public Integer getValue(SumAccumulator accumulator) {
        return accumulator.sumPrice;
    }
}

编写完后,打成jar包  打包命令   mvn package -Dcheckstyle.skip, 也可以用maven 打包

打包好后,放到pyflink的 lib目录下

(2) 编写sql ,加载定义的udaf jar包

from pyflink.table.expressions import call

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
bt_env = TableEnvironment.create(settings)


# 创建source 表

bt_env.execute_sql(f"""
    create table source(
        productPrice int 
    )with(
         'connector' = 'filesystem',
         'path' = '/root/anaconda3/envs/rlink/flink_demo/data/add.json',
         'format'='json'
    )
""")

# 加载udaf jar包
bt_env.get_config().get_configuration().set_string("pipeline.jars", 
                        "file:///root/anaconda3/envs/rlink/lib/python3.7/site-packages/pyflink/lib/udaf-1.0-SNAPSHOT.jar")


# 注册udaf
bt_env.execute_sql(f"""
    create temporary function addProductPrice AS 'com.flink_ray.UDAFSum'
""")

# use the Java function in SQL API
tab = bt_env.sql_query("SELECT  addProductPrice(productPrice)  FROM source")

最后运行结果如下:

pyflink sql udaf 使用流程

 

上一篇:pyflink sql demo


下一篇:PyFlink + 区块链?揭秘行业领头企业 BTC.com 如何实现实时计算