最近在对flink的运行任务进行优化,因为我们的计算方面业务上比较依赖组织架构比如总公司、子公司、子部门我们针对总公司要汇总一版全公司的数据,子公司要汇总一版子公司数据。但是实际上子公司的数据是属于总公司的,这样的话数据会出现多次重复参与计算的问题。
为了减少这种冗余计算,我们利用创建临时表的思路搭配flink的table api以及sql api进行数据复用。从底层开始汇总,上层复用底层的轻度汇总结果进行进一步的汇总。这种思路类似与于我们学习hadoop时候mapreduce中的combine操作,即先合并后计算。并把中间计算结果保存成临时表供后续重复使用。这样调整后在flink管理页面查看ExecutionGraph执行图时会发现结构也变成了树状图,并且tasks数量也直接少了一半。
但是调整后经过观察发现flink计算后的数据输出至kafka时出现数据跳动问题,导致前台展示忽大忽小,原因是使用中间表后因为中间数据也在作为sink输出,这样数据变化时会像一个传送带一样第一层数据变化引起第二层变化然后引起第三层变化。因为flink数据状态变更为先删后插这样传送带也会先删导致后续使用该中间状态的数据变小后插入时再引起数据变大从而产生数据的跳动动结果。
解决方法:开启微批将中间数据跳动过滤掉,设置如下:
table.exec.mini-batch.enabled 设置微true
table.exec.mini-batch.allow-latency 缓存数据最大的时间间隔,超过该间隔,将会强制触发已聚合数据写出给下游,默认-1毫秒即立即触发,可以根据需求自行调整。实时性要求较高可以适当调低,实时性要求不高可调高
table.exec.mini-batch.size 黄村数据量最大条数,为防止oom做的双重保障与table.exec.mini-batch.allow-latency设置的时间满足一个就会触发