一、UDF的分类
UDF类型 |
描述 |
UDF(User Defined Scalar Function) |
用户自定义标量值函数。其输入与输出是一对一的关系,即读入一行数据,输出一个值。 |
UDTF(User Defined Table Valued Function) |
自定义表值函数。用于解决调用一次函数输出多行数据的需求。UDTF是唯一能够返回多个字段的自定义函数。UDTF不等于UDT(User Defined Type)。 |
UDAF(User Defined Aggregation Function) |
自定义聚合函数。其输入与输出是多对一的关系,即将多条输入记录聚合成一个输出值。UDAF可以与SQL中的GROUP BY语句联用。具体语法请参见 。 |
二、UDF参数解析
MaxCompute数据类型与Java数据类型的对应关系如下。
注意点:
- 此处ARRAY类型对应的Java类型是List,而不是数组。
- VARCHAR,BINART,STRUCT一些数据类型是ODPS独有的
- Java中对应的数据类型以及返回值数据类型是对象,数据类型首字母需大写。
MaxCompute Type |
Java Type |
TINYINT |
java.lang.Byte |
SMALLINT |
java.lang.Short |
INT |
java.lang.Integer |
BIGINT |
java.lang.Long |
FLOAT |
java.lang.Float |
DOUBLE |
java.lang.Double |
DECIMAL |
java.math.BigDecimal |
BOOLEAN |
java.lang.Boolean |
STRING |
java.lang.String |
VARCHAR |
com.aliyun.odps.data.Varchar |
BINARY |
com.aliyun.odps.data.Binary |
DATETIME |
java.util.Date |
TIMESTAMP |
java.sql.Timestamp |
ARRAY |
java.util.List |
MAP |
java.util.Map |
STRUCT |
com.aliyun.odps.data.Struct |
MaxCompute 2.0版本支持定义Java UDF时,使用Writable类型作为参数和返回值。MaxCompute数据类型和Java Writable类型的映射关系如下。
MaxCompute Type |
Java Writable Type |
TINYINT |
ByteWritable |
SMALLINT |
ShortWritable |
INT |
IntWritable |
BIGINT |
LongWritable |
FLOAT |
FloatWritable |
DOUBLE |
DoubleWritable |
DECIMAL |
BigDecimalWritable |
BOOLEAN |
BooleanWritable |
STRING |
Text |
VARCHAR |
VarcharWritable |
BINARY |
BytesWritable |
DATETIME |
DatetimeWritable |
TIMESTAMP |
TimestampWritable |
INTERVAL_YEAR_MONTH |
IntervalYearMonthWritable |
INTERVAL_DAY_TIME |
IntervalDayTimeWritable |
ARRAY |
N/A |
MAP |
N/A |
STRUCT |
N/A |
MaxCompute SQL Type |
Python 2 Type |
BIGINT |
INT |
STRING |
STR |
DOUBLE |
FLOAT |
BOOLEAN |
BOOL |
DATETIME |
INT |
FLOAT |
FLOAT |
CHAR |
STR |
VARCHAR |
STR |
BINARY |
BYTEARRAY |
DATE |
INT |
DECIMAL |
DECIMAL.DECIMAL |
ARRAY |
LIST |
MAP |
DICT |
STRUCT |
COLLECTIONS.NAMEDTUPLE |
MaxCompute SQL Type |
Python 3 Type |
BIGINT |
INT |
STRING |
UNICODE |
DOUBLE |
FLOAT |
BOOLEAN |
BOOL |
DATETIME |
DATETIME.DATETIME |
FLOAT |
FLOAT |
CHAR |
UNICODE |
VARCHAR |
UNICODE |
BINARY |
BYTES |
DATE |
DATETIME.DATE |
DECIMAL |
DECIMAL.DECIMAL |
ARRAY |
LIST |
MAP |
DICT |
STRUCT |
COLLECTIONS.NAMEDTUPLE |
三、UDF的使用方式
UDF、UDTF、UDAT可进行参考文档
https://help.aliyun.com/document_detail/27867.html?spm=a2c4g.11186623.6.762.463d7468xnFPHb
JAVA UDF
UDF的高级使用:
3.1UDF中的变长参数
java语言:
package com.mrtest.cn; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.annotation.Resolve; import java.util.ArrayList; import java.util.List; "*->array"}) ({public class TestUDF extends UDF { public List evaluate(String ... s) { List list = new ArrayList(); for (String name : s) { list.add(name); } return list; } }
Python语言:
from odps.udf import annotate "*->bigint") (class ParamFunc(object): def evaluate(self, *nums): sum = 0 for num in nums: sum=num+sum return sum
3.2UDF的重载
注意事项:对于List与List是不能解析对应的方法的,这种属于类型擦除
package com.aliyun.odps.examples.udf; import com.aliyun.odps.udf.UDF; public class UDFExample extends UDF { public String evaluate(String a) { return "s2s:" + a; } public String evaluate(String a, String b) { return "ss2s:" + a + "," + b; } public String evaluate(String a, String b, String c) { return "sss2s:" + a + "," + b + "," + c; } }
3.3UDF访问对应文件和表
java语言:
package com.aliyun.odps.examples.udf; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Iterator; public class UDFResource extends UDF { ExecutionContext ctx; long fileResourceLineCount; long tableResource1RecordCount; long tableResource2RecordCount; public void setup(ExecutionContext ctx) throws UDFException { this.ctx = ctx; try { InputStream in = ctx.readResourceFileAsStream("file_resource.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line; fileResourceLineCount = 0; while ((line = br.readLine()) != null) { fileResourceLineCount++; } br.close(); Iterator iterator = ctx.readResourceTable("table_resource1").iterator(); tableResource1RecordCount = 0; while (iterator.hasNext()) { tableResource1RecordCount++; iterator.next(); } iterator = ctx.readResourceTable("table_resource2").iterator(); tableResource2RecordCount = 0; while (iterator.hasNext()) { tableResource2RecordCount++; iterator.next(); } } catch (IOException e) { throw new UDFException(e); } } /** * project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb */ public String evaluate(String a, String b) { return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount; } }
python语言:
#coding: utf-8 from odps.udf import annotate from odps.distcache import get_cache_file 'double -> double') (class Compute(object): def __init__(self): import json #获取对应文本文件 cache_file = get_cache_file('file.txt') dataMat = [] for line in cache_file : curLine = line.strip().split(',') #处理逻辑 cache_file.close() #获取对应的表文件 records = list(get_cache_table('table_resource1')) for record in records: self.my_dict[record[0]] = [record[1]] #处理逻辑 def evaluate(self, input): #处理逻辑
3.4UDF访问外部网络(VPC、外部网络、专有网络)
https://help.aliyun.com/document_detail/187866.html
3.5UDF使用第三方包
https://help.aliyun.com/document_detail/189752.html
#coding: utf-8 # explode.py from odps.udf import annotate from odps.distcache import get_cache_archive import datetime def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted ([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) sys.path.append(os.path.dirname(dir_names[0])) "string->boolean") (class is_workday_udf(object): def __init__(self): include_package_path('chinese-calendar-master.zip') def evaluate(self, date_str): # try: import chinese_calendar date_strs = date_str.split("-") year_num = int(date_strs[0]) month_num = int(date_strs[1]) day_num = int(date_strs[2]) date_num = datetime.date(year=year_num, month=month_num, day=day_num) result = chinese_calendar.is_workday(date_num) return result # except: # return True
函数的注册
执行的select的的操作
set odsp.pypy.enabled=false; set odps.isolation.session.enable=true; select my_json('{"info":"11","desc":"a|b","filename":"4b-2a-3c-4d-5b"}') as a;
3.6使用嵌入式开发UDF
java语言:
CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING #CODE ('lang'='JAVA') package com.mypackage; import com.aliyun.odps.udf.UDF; public class Reverse extends UDF { public String evaluate(String input) { if (input == null) return null; StringBuilder ret = new StringBuilder(); for (int i = input.toCharArray().length - 1; i >= 0; i--) { ret.append(input.toCharArray()[i]); } return ret.toString(); } } #END CODE;
SELECT foo('abdc');
- 嵌入式代码块可以置于USING后或脚本末尾,置于USING后的代码块作用域仅为CREATE TEMPORARY FUNCTION语句。
- CREATE TEMPORARY FUNCTION创建的函数为临时函数,仅在本次执行生效,不会存入MaxCompute的Meta系统。
python语言:
CREATE TEMPORARY FUNCTION foo AS 'embedded.UDFTest' USING #CODE ('lang'='PYTHON', 'filename'='embedded') from odps.udf import annotate "bigint->bigint") (class UDFTest(object): def evaluate(self, a): return a * a #END CODE;
SELECT foo(4);
- Python代码的缩进需要符合Python语言规范。
- 由于注册Python UDF时AS后的类名需要包含Python源码的文件名,您可以通过’filename’=’embedded’指定一个虚拟文件名。
3.7使用SQL语言定义函数
create sql function my_sum(@a BIGINT, @b BIGINT, @c BIGINT) returns @my_sum BIGINT as begin @temp := @a + @b; @my_sum := @temp + @c; end;
create sql function my_func(@s STRING) AS if(@s rlike '"git_(m|a)"', 1, 0);
欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码