Flink Table API & SQL
关系型数据库中:database.schema.table
其他
分布式数据库中:catalog.database.table
这里的自定义函数,主要指在 Flink Table API & SQL 这个层级的自定义函数,注意和Datastream有所区别
1.函数区分
1.从两个角度来区分函数
从函数的拥有来讲: 系统内建函数 和自定义函数 system (or built-in) functions v.s. catalog functions
从生命周期来讲: 分为临时函数和永久函数 temporary functions v.s. persistent functions
2.因此组合的方式:
Temporary system functions
System functions
Temporary catalog functions
Catalog functions
2.如何使用函数:
精确引用 1.10版本及以上使用
模糊引用: 在SQL语句中使用,针对同名的情况,引用的顺序是:
Temporary system function
System function
Temporary catalog function, in the current catalog and current database of the session
Catalog function, in the current catalog and current database of the session
3.系统内建函数
Scalar Functions 标量型函数
算数函数 代数函数(= != > isnull in exists between) 逻辑函数(and or )
字符函数 时间函数
条件函数 case when COALESCE IS_DIGIT
函数类型转换 CAST
Collection函数: ARRAY Map
分组函数 Hash值函数
Aggregate Functions 聚合型函数
max min sum count
COLLECT
ROW_NUMBER DENSE_RANK RANK()
ROW_NUMBER 1 2 3 4
dense_rank 函数在生成序号时是连续的,1 2 2 3 dense稠密
rank 函数生成的序号有可能不连续。 1 2 2 4
Column Functions 列函数 -Column functions are only used in Table API.
withColumns
withoutColumns
Flink的UDF
扩展了查询的表达能力,同时可以把这种表达能力开放出去
基于JVM语言的UDF: Java Scala
自定义函数类型
Scalar functions map scalar values to a new scalar value.
Table functions map scalar values to new rows.
Aggregate functions map scalar values of multiple rows to a new scalar value.
Table aggregate functions map scalar values of multiple rows to new rows.
Async table functions are special functions for table sources that perform a lookup.
从与Hive比较角度: UDF UDTF UDAF
Flink自身又有说细分和增加
版本的不同:
UDF UDTF: org.apache.flink.table.types.DataType
UDAF : org.apache.flink.api.common.typeinfo.TypeInformation
aggregate 这部分正在重构,目前是使用TypeInformation,重构后使用DataType
(注意: Flink设计类型信息的有
TypeInformation org.apache.flink.api.common.typeinfo.Types
org.apache.flink.api.common.typeinfo.TypeInformation
Type org.apache.flink.table.api.Types
DataType org.apache.flink.table.types.DataType 1.9版本以后移除了对 TypeInformation 的依赖
)
* @see ScalarFunction org.apache.flink.table.functions.
* @see TableFunction org.apache.flink.table.functions.
* @see AggregateFunction
* @see TableAggregateFunction
* @see AsyncTableFunction
如何编写
如何调用: both Table API and SQL.
For SQL queries , a function must always be registered under a name.
For Table API , a function can be registered or directly used inline
示例:
0.编写UDF
// define function logic
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
###使用UDF
1.对于SQL来讲,需要注册,然后在SQL中使用
// register function
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// call registered function in SQL
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
2.对于TableAPI来说,可以直接用,或者注册后再在Table API中使用
// call function "inline" without registration in Table API
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
// register function
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// call registered function in Table API
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
具体说明
Udf提供了open()和close()方法,可以被复写,功能类似Dataset和DataStream API的RichFunction方法
1.UDF继承 ScalarFunction 抽象类,主要实现 eval 方法。
输出一行
org.apache.flink.table.functions
public abstract class ScalarFunction extends UserDefinedFunction {}
注意:返回值类型:
基本的返回值类型 和自定义复杂的返回值类型
复杂的可能要实现方法: getResultType()
2.UDF继承 TableFunction 抽象类,主要实现 eval 方法。
输出任意数目的行数。返回的行也可以包含一个或者多个列,通过提供 provide a collect(T) method
org.apache.flink.table.functions
public abstract class TableFunction<T> extends UserDefinedFunction {}
3.Aggregation Functions
The following methods are mandatory for each AggregateFunction:
createAccumulator()
accumulate()
getValue()
Spark SQL的UDAF UserDefinedAggregateFunction
Flink: org.apache.flink.table.functions.AggregateFunction
public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
<IN, ACC, OUT>
必不可少的: createAccumulator() accumulate() getValue()
The following methods of AggregateFunction are required depending on the use case
merge()方法在会话组窗口(session group window)上下文中是必须的
retract()
resetAccumulator()
Spark中 org.apache.spark.sql.expressions
public abstract class UserDefinedAggregateFunction extends Serializable{}
inputSchema bufferSchema dataType
initialize update merge evaluate
Hive: org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
1)需继承AbstractGenericUDAFResolver抽象类,重写方法getEvaluator(TypeInfo[] parameters);
2)内部静态类需继承GenericUDAFEvaluator抽象类,重写方法 init()
实现方法 getNewAggregationBuffer() reset() iterate() terminatePartial() merge() terminate()
4.Table Aggregation Functions
TableAggregateFunction
createAccumulator()
accumulate()
The following methods of TableAggregateFunction are required depending on the use case:
retract() is required for aggregations on bounded OVER windows.
merge() is required for many batch aggregations and session window aggregations.
resetAccumulator() is required for many batch aggregations.
emitValue() is required for batch and window aggregations.
emitUpdateWithRetract
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html