UDAF
class odps.udf.BaseUDAF
继承此类实现Python UDAF。
BaseUDAF.new_buffer()
实现此方法返回聚合函数的中间值的buffer。buffer必须是mutable object(比如list, dict),并且buffer的大小不应该随数据量递增,在极限情况下,buffer marshal过后的大小不应该超过2Mb。
BaseUDAF.iterate(buffer[, args, ...])
实现此方法将args聚合到中间值buffer中。
BaseUDAF.merge(buffer, pbuffer)
实现此方法将两个中间值buffer聚合到一起,即将pbuffer merge到buffer中。
BaseUDAF.terminate(buffer)
实现此方法将中间值buffer转换为ODPS SQL基本类型。下面是一个UDAF求平均值的例子。
from odps.udf import annotate
from odps.udf import BaseUDAF
@annotate('bigint->bigint')
class Average(BaseUDAF):
def new_buffer(self):
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0
return buffer[0] / buffer[1]
比如计算1,2,3,4的平均值的执行过程如下图所示: