今天分析一下,flink table聚合udf AggregateFunction的open函数未被调用的bug。
情景一:
当然,对于udf的聚合操作,在flink里面有两种用法,一种是不用窗口的分组聚合类似于
Table table = tEnv.sqlQuery("select DateUtil(rowtime,'yyyyMMddHH'),WeightedAvg(number,number) from source group by DateUtil(rowtime,'yyyyMMddHH')");
情景二:
一种是使用窗口的分组聚合操作,例如:
tEnv.sqlUpdate("insert into sink select fruit,WeightedAvg(number,number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");
表面上看是是同一个类型的udf,底层执行逻辑应该一样。但是flink内部coden的时候,被完全解析成了不同的聚合函数。
假设我们定义一个AggregateFunction的udf叫做WeightedAvg,主要进行求平均值,其中有一个变量 flag,初始值为1 ,我们想我在open的时候更改为100.
package org.table.agg;
import org.apache.flink.table.functions.AggregateFunction;import org.apache.flink.table.functions.FunctionContext;
import java.util.Iterator;
/** * Weighted Average user-defined aggregate function. */public class WeightedAvg extends AggregateFunction<Integer, WeightedAvgAccum> { @Override public void open( FunctionContext context) throws Exception, Exception { this.flag =100; }
private int flag =1; @Override public WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); }
@Override public Integer getValue(WeightedAvgAccum acc) { System.out.println("value of flag is : "+flag); if (acc.count == 0) { return null; } else { int i = acc.sum / acc.count; return i; } }
public void accumulate(WeightedAvgAccum acc, int iValue, int iWeight) { acc.sum += iValue * iWeight; acc.count += iWeight; }
public void retract(WeightedAvgAccum acc, int iValue, int iWeight) { acc.sum -= iValue * iWeight; acc.count -= iWeight; }
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { WeightedAvgAccum a = iter.next(); acc.count += a.count; acc.sum += a.sum; } }
public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0; }}
package org.table.agg;
/** * Accumulator for WeightedAvg. */public class WeightedAvgAccum { public int sum = 0; public int count = 0;}
分别执行两个sql之后,你会发现:
情景一:value of flag is : 100
情景二:value of flag is : 1
之所以会情景一没有被更改为 100 主要原因是open函数没有调用,显然这种情况下,在AggregateFunction的open函数里初始化外部客户端,比如mysql,redis等客户端初始化,或者通过open的context参数传递一些参数到AggregateFunction,比如权重阈值等,都变的行不通了。
直接给出大致结论,主要原因是:
情景一对应DataStream的GroupAggProcessFunction。
情景二对应DataStream的AggregateFunction,而该函数并没有open方法。仅仅说的是滚动窗口,还有其它窗口AggregateUtil。
解决办法是有很多,比如使用构造函数在注册的时候传参并初始化,比如使用readobject()|writeObject()方法等。
如代码,可以给WeightedAvg加入构造函数:
public WeightedAvg(int flag) { this.flag = flag; }
然后注册udf的时候直接初始化:
tEnv.registerFunction("WeightedAvg",new WeightedAvg(100));
哎,只能说flink的坑太多,有待改进。但是这个也体现出了我们码农的存在的必要性。
本文举例仅仅是一种窗口操作,更多的窗口聚合是否会调用aggregateFunction方法,可以仔细阅读AggregateUtil。
更多问题欢迎加入浪尖知识星球: