MaxCompute - ODPS重装上阵 第八弹 - 动态类型函数

MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台, 尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。 MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

MaxCompute基于ODPS2.0新一代的SQL引擎,显著提升了SQL语言编译过程的易用性与语言的表达能力。我们在此推出MaxCompute(ODPS2.0)重装上阵系列文章

MaxCompute自定义函数的参数和返回值不够灵活,是数据开发过程中时常被提及的问题。Hive 提供给了 GenericUDF 的方式,通过调用一段用户代码,让用户来根据参数类型决定返回值类型。MaxCompute 出于性能、安全性等考虑,没有支持这种方式。但是MaxCompute也提供了许多方式,让您能够灵活地自定义函数。

  • 场景1
    需要实现一个UDF,可以接受任意类型的输入,但是MaxCompute的UDF不支持泛型,要做一个接受任何类型的函数,就必须为每种类型都写一个evaluate函数。
  • 场景2
    MaxCompute的UDAF和UDTF使用@Resolve的注解来指定输入输出类型,无法重载。要做一个接受多种类型的自定义功能,就需要定义多个不同的函数。
  • 场景3
    MaxCompute支持了参数化视图,能够把一些公共的SQL提出来。参数化视图的表值参数要求输入表的列数和类型与视图定义时完全一致,如果想要写一个能够接受具有相似特征的不同的表的视图,还无法定义出来。

本文带大家一起看看MaxCompute对这些大家关心的问题都做了哪些改进。

参数化视图

问题

参数化视图是MaxCompute自己设计的一种视图。允许用户定义参数,从而能够大大视图代码的复用率。很多用户都利用这一功能,将一些公共SQL提取到视图中,形成公共SQL代码池。

参数化视图在声明过程中具有局限性:参数类型,长度都是固定的。尤其是参数化视图允许传入表值参数,表值参数要求形参与实参在列的个数和类型上都一致。这一点限制了许多使用场景,如下面的例子:

CREATE VIEW paramed_view (@a TABLE(key bigint)) AS SELECT @a.* FROM @a JOIN dim on a.key = dim.key;

这个例子封装了一段使用dim表来过滤输入表的逻辑,本来这个是个通用的逻辑,任何包含key这一列的表,都可以用来做输入表。但是由于定义视图时只能确定输入中包含key列,因此声明的参数类型只包含这一列。导致了视图的调用者传递的表参数必须只能有一列,而返回的数据集也只包含一列,这显然与这个视图的设计初衷不合。

改进

最新的MaxCompute版本对参数化视图做了一些改进,可以大大提升参数化视图定义的灵活性。

首先,参数化视图的参数可以使用ANY关键字,表示任意类型。如

CREATE VIEW paramed_view (@a ANY) AS SELECT * FROM src WHERE case when @a is null then key1 else key2 end = key3;

这里定义的视图,第一个参数可以接受任意类型。注意ANY类型不能参与如 '+', 'AND' 之类的需要明确类型才能做的运算。ANY类型更多是在TABLE参数中做passthrough列,如

CREATE VIEW paramed_view (@a TABLE(name STRING, id ANY, age BIGINT)) AS SELECT * FROM @a WHRER name = 'foo' and age < 25;

-- 调用示例
SELECT * FROM param_view((SELECT name, id, age from students));

上面的视图接受一个表值参数,但是并不关心这个表的第二列,那么这个列可以直接定义为ANY类型。参数化视图在调用时,每次都会根据输入参数的实际类型重新推算返回值类型。比如上面的视图,当输入的表是 TABLE(c1 STRING, c2 DOUBLE, c3 BIGINT),那么输出的数据集的第二列也会自动变成DOUBLE类型,让视图的调用者可以使用任何可用于DOUBLE类型的操作来操作这一列。

需要注意的一点是,我们用CREATE VIEW创建了视图后,可以用DESC来获取视图的描述,这个描述中会包含视图的返回类型信息。但是由于视图的返回类型是在调用的时候重新推算的,重新推算出来的类型可能与创建视图时推导出来的不一致。一个例子就是上面的ANY类型。

在ANY之外,参数化视图中的表值参数还支持了*,表示任意多个列。这个 * 可以带类型,也可以使用ANY类型。如

CREATE VIEW paramed_view (@a TABLE(key STRING, * ANY), @b TABLE(key STRING, * STRING)) AS SELECT a.* FROM @a JOIN @b ON a.key = b.key; 

-- 调用示例
SELECT name, address FROM param_view((SELECT school, name, age, address FROM student), school) WHERE age < 20;

上面这个视图接受两个表值参数,第一个表值参数第一列是string类型,后面可以是任意多个任意类型的列,而第二个表值参数的第一列是string,后面可以是任意多个STRING类型的列。这其中有几点需要注意:

  • 变长部分必须要写在表值参数定义的最后面,即在 * 的后面不允许再有其他列。这也间接导致了一个表值参数中最多只有一个变长列列表。
  • 由于变长部分必须在最后,有的时候输入表的列不一定是按照这种顺序排列的,这时候需要对输入表的列做一定重排,可以以subquery作为参数(参考上面的例子),注意subquery外面要加一层括号。
  • 由于表值参数中变长部分没有名字,因此在视图定义过程中没办法获得对这部分数据的引用,也就没有办法对这些数据做运算。这个限制是特意设置的,如果需要对变长部分的数据做运算,需要把要运算的列声明在定长部分,而编译器会对调用时传入的参数进行检查。
  • 虽然不能对变长部分做运算,但是 SELECT * 这种通配符的使用依旧可以将变长部分的列传递出去,如上面的例子在paramed_view中将 @a 的所有列返回,虽然创建视图的时候,a中只有key这一列,但是调用视图的时候,编译器推算出@a中还包含了name, age, address,因此视图返回的数据集中也包含这三列,而视图的调用者也可以对着三列进行操作(如 WHERE age < 20)。
  • 表值参数的列与视图声明时指定的定长列部分不一定完全一致。如果名字不一样,编译器会自动做重命名,如果类型不一样,编译器会做隐式转换(不能隐式转换则会报错)。

上面提到的第4点非常有用,一方面保证了调用视图是输入参数的灵活性,另一方面又不降低数据的信息量。好好利用能够很大程度上增加公共代码的复用率。

下面是一个调用示例。该例子使用的视图是:

CREATE VIEW paramed_view (@a TABLE(key STRING, * ANY), @b TABLE(key STRING, * STRING)) AS SELECT a.* FROM @a JOIN @b ON a.key = b.key; 

在MaxCompute Studio中调用,可以享受语法高亮和错误提示等功能。执行的调用代码如下:

MaxCompute - ODPS重装上阵 第八弹 - 动态类型函数

执行的状态图如下:

MaxCompute - ODPS重装上阵 第八弹 - 动态类型函数

放大执行过程仔细观察,图中可以发现几点有意思的地方:

MaxCompute - ODPS重装上阵 第八弹 - 动态类型函数

上述执行输出的结果如下:

+------+---------+
| name | address |
+------+---------+
| 小明 | 杭州 |
+------+---------+

其他用法

经常有用户误用参数化视图,将参数化视图的参数当做是宏替换参数来使用。这里说明一下。参数化视图实际上是函数调用,而不是宏替换。如下面的例子:

CREATE VIEW paramed_view(@a TABLE(key STRING, value STRING), @b STRING) 
AS SELECT * FROM @a ORDER BY @b;

-- 调用示例
select * from paramed_view(src, 'key');

上面的例子中,用户的期望是 ORDER BY @b 被宏替换为 ORDER BY key,即根据输入参数,决定了按照key列做排序。然而,实际上参数@b是作为一个值来传递的,ORDER BY @b 相当于 ORDER BY 'key',即 ORDER BY一个字符串常量('key')而不是一列。要想实现"让调用者决定排序列"这一功能,可以考虑下述做法。

CREATE VIEW orderByFirstCol(@a TABLE(columnForOrder ANY, * ANY)) AS SELECT `(columnForOrder)?+.+` FROM (SELECT * FROM @a ORDER BY columnForOrder) t;

-- 调用示例
select * from orderByFirstCol((select key, * from src));

上面的例子,要求调用者将要排序的列放在第一列,于是在调用的时候使用子查询将src的需要排序的列抽取到最前面。视图返回的 (columnForOrder)?+.+ 是一个正则通配符,匹配columnForOrder之外的所有列,列表达式使用正则表达式可参考SELECT语法介绍>列表达式关于正则表达式的说明。

UDF:函数重载方式

问题

MaxCompute 的 UDF 使用重载 evalaute 方法的方式来重载函数,如下面的UDF定义了两个重载,当输入是 String 类型时,输出String类型,输入是BIGINT类型时,输出DOUBLE类型。

public UDFClass extends UDF {

    public String evaluate(String input) { return input + "123";  }

    public Double evaluate(Long input) { return input + 123.0; }
}

这种方式固然能解决一些问题,但有一定的局限性。比如不支持泛型,要做一个接受任何类型的函数,就必须为每种类型都写一个evaluate函数。有的时候重载甚至是不能实现的,比如ARRAY 和 ARRAY 的重载是做不到的。

public UDFClass extends UDF {

    public String evaluate(List<Long> input) { return input.size(); }

    // 这里会报错,因为在java类型擦除后,这个函数和 String evaluate(List<Long> input) 的参数是一样的
    public Double evaluate(List<Double> input) { input.size(); } 

    // UDF 不支持下面这种定义方式
    public String evaluate(List<Object> input) { return input.size(); }
}

PYTHON UDF 或 UDTF 在不提供 Resolve 注解(annotation)的时候,会根据参数个数决定输入参数,也支持变长,因此非常灵活。但也因为过于灵活,编译器无法静态找到某些错误。比如

class Substr(object):
    def evaluate(self, a, b):
      return a[b:];

上面的函数接受两个参数,从实现上看,第一个参数需要是STRING类型,第二个参数应该是整形。而这个限制需要用户在调用时自己去把握。即使用户传错了参数,编译器也没有办法报错。同时,这种方式定义的UDF返回值类型只能是STRING,不够灵活。

改进

要解决上面的问题。可以考虑使用UDT。 UDT经常被简单在调用JDK中的方法的时候使用,比如 java.util.Objects.toString(x) 将任何对象 x 转成STRING类型。但是在自定义函数方面同样也有很好的用途。 UDT支持泛型,支持类继承,支持变长等功能,让定义函数更方便。如下面的例子:

public class UDTClass {

    // 这个函数接受一个数值类型(可以是 TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE 以及任何以Number为基类的UDT),返回DOUBLE
    public static Double doubleValue(Number input) {
        return input.doubleValue();
    }

    // 这个方法,接受一个数值类型参数和一个任意类型的参数,返回值类型与第二个参数的类型相同
    public static <T extends Number, R> R nullOrValue(T a, R b) {
        return a.doubleValue() > 0 ? b : null;
    }

    // 这个方法接受一个任意元素类型的array或List,返回BIGINT
    public static Long length(java.util.List<? extends Object> input) {
        return input.size();
    }

    // 注意这个在不做强制转换的情况下参数只能接受 UDT 的 java.util.Map<Object, Object> 对象。如果需要传入任何map对象,比如 map<bigint,bigint> 可以考虑:
    // 1. 定义函数时使用java.util.Map<? extends Object, ? extends Object>
    // 2. 调用时强转,比如 UDTClass.mapSize(cast(mapObj as java.util.Map<Object, Object>))
    public static Long mapSize(java.util.Map<Object, Object> input) {
        return input.size();
    }
}

UDT 能够提供灵活的函数定义方式。但是有的时候UDF 需要通过 com.aliyun.odps.udf.ExecutionContext(在setup方法中传入)来获取一些上下文。现在UDT也可以通过 com.aliyun.odps.udt.UDTExecutionContext.get() 方法来或者这样的一个 ExecutionContext 对象。

Aggregator 与 UDTF:Annotation方式

问题

MaxCompute 的 UDAF 和 UDTF 使用Resolve注解来决定函数Signature。比如下面的方式定义了一个UDTF,该UDTF接受一个BIGINT参数,返回DOUBLE类型。

@com.aliyun.odps.udf.annotation.Resolve("BIGINT->DOUBLE")
public class UDTFClass extends UDTF {
    ...
}

这种方式的局限性很明显,输入参数和输出参数都是固定的,没办法重载。

改进

MaxCompute对Resolve注解的语法做了许多扩展,现在能够支持一定的灵活性。

  • 参数列表中可以使用星号('*'),表示接受任意长度的,任意类型的输入参数。比如 @Resolve('double,*->String'),接受第一个是double,后接任意类型,任意个数的参数列表。这里需要UDF的作者在代码里面自己去判断输入的个数和类型,然后做出相应的动作(可以对比 C 语言里面的 printf 函数来理解)。注意星号用在返回值列表中时,表示的是不同的含义,在后续第三点中说明。
  • 参数列表中可以使用 ANY 关键字,表示任意类型的参数。比如 @Resolve('double,any->string'),接受第一个是double,第二个任意类型的参数列表。注意,ANY在返回值列表中不能使用,也不能在复杂类型的子类型中使用(如不能写ARRAY)。
  • UDTF的返回值可以使用星号,表示返回任意多个string类型。这里需要注意,返回值的个数并非真的是任意多个,而是与调用函数时给出的alias个数有关。比如@Resolve("ANY,ANY->DOUBLE,*"),调用方式是 UDTF(x, y) as (a, b, c),这里as后面给出了三个alias (a, b, c),编译器会认定a为double类型(annotation中返回值第一列的类型是给定的),b,c为string类型,而因为这里给出了三个返回值,所以UDTF在forward的时候,也一定要forward长度为3的数组,否则会出现运行时错误。注意这个错误是无法在编译时给出的,因此通常需要UDTF的作者与调用者互相沟通好,调用者在SQL中给出alias个数的时候,一定要按照UDTF的需要来写。由于Aggregator返回值个数固定是1,所以这个功能对UDAF无意义。

用一个例子来说明。如下UDTF:

import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import org.json.JSONException;
import org.json.JSONObject;

@Resolve("STRING,*->STRING,*")
public class JsonTuple extends UDTF {

  private Object[] result = null;

  @Override
  public void process(Object[] input) throws UDFException {
    if (result == null) {
      result = new Object[input.length];
    }

    try {
      JSONObject obj = new JSONObject((String)input[0]);
      for (int i = 1; i < input.length; i++) {
        // 返回值要求变长部分都是STRING
        result[i] = String.valueOf(obj.get((String)(input[i])));
      }
      result[0] = null;
    } catch (JSONException ex) {
      for (int i = 1; i < result.length; i++) {
        result[i] = null;
      }
      result[0] = ex.getMessage();
    }
    forward(result);
  }
}

这个UDTF的返回值个数会根据输入参数的个数来决定。输出参数的第一个是一个JSON文本,后面是需要从JSON中解析的key。返回值第一个是解析JSON过程中的出错信息,如果没有出错,则后续根据输入的key依次输出从json中解析出来的内容。使用示例如下。

-- 根据输入参数的个数定制输出alias个数
SELECT my_json_tuple(json, ’a‘, 'b') as exceptions, a, b FROM jsons;

-- 变长部分可以一列都没有
SELECT my_json_tuple(json) as exceptions, a, b FROM jsons;

-- 下面这个SQL会出现运行时错误,因为alias个数与实际输出个数不符
-- 注意编译时无法发现这个错误
SELECT my_json_tuple(json, 'a', 'b') as exceptions, a, b, c FROM jsons;

上面虽然做出了许多扩展,但是这些扩展并不一定能满足所有的需求。这时候依然可以考虑使用UDT。UDT也是可以用来实现Aggregator和UDTF的功能的。详细可以参考UDT示例文档,“聚合操作的实现示例” 及 “表值函数的实现示例” 的内容。

总结

MaxCompute自定义函数的函数原型不够灵活,在数据开发过程中带来诸多不便利,本文列举了各种函数定义方式存在的问题与解决方案,希望对大家有帮助,同时也告诉大家MaxCompute一直在努力为大家提供更好的服务。

上一篇:《Oracle数据库性能优化方法论和最佳实践》——2.6 流程、资源和组件优化方法论


下一篇:SQL Server数据库同步问题分享[未完,待续](一)