Flink批处理优化器之成本估算

成本估算

在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。

什么是成本

Flink以类Costs来定义成本,它封装了一些成本估算的因素同时提供了一些针对成本对象的计算方法(加、减、乘、除)以及对这些因素未知值的认定与校验。

“cost”一词也有译作:开销、代价,将其视为同义即可。

Flink当前将成本估算的因素划分为两大类:

  • 可量化的成本估算因素:指代通过跟踪一个可量化的测量指标可以计算出的成本估算因素(比如网络或I/O的字节数);
  • 启发式的成本估算因素:指代那些不可定量计算的成本估算因素,因此只能给出一些定性的经验值;

当前被纳入成本估算的因素如下:

  • 网络成本;
  • 磁盘I/O成本;
  • CPU成本;
  • 启发式网络成本;
  • 启发式磁盘成本;
  • 启发式CPU成本;

可量化的成本估算因素可能经常会被设置为未知的(UNKNOWN,在Costs中以字面常量值-1表示)。当可量化的成本估算因素被置为未知时,所有操作的成本都将变成未知的,因此这将导致在进行优化裁剪期间,无法决策出哪个偏向的操作。在这种情况下,启发式的成本估算因素必须能发挥作用,它应该包含一个值来确保以不同策略执行的运算符是可比较的(甚至在无法估算的情况下)。

如何估算成本

成本的估算借助于成本估算器(CostEstimator),CostEstimator定义了一系列增加成本的方法,这些方法有待具体的估算器实现,它们大致分为三大类:

  • 增加传输策略的成本;
  • 增加本地策略的成本;
  • 增加屏障的成本;

CostEstimator借助于以上这几类方法,可完成对一个运算符总成本的计算,具体的计算逻辑封装在方法costOperator中,该方法接收一个计划节点(PlanNode)参数,然后按照传输策略和本地策略分别进行枚举与计算。完整的方法如下:

DefaultCostEstimator继承自CostEstimator,作为默认的(也是唯一的)成本估算器。它实现了上面计算成本逻辑中调用的一系列增加成本的addXXX方法。这些方法中的绝大部分,又依赖于预算提供器(EstimateProvider)所提供的预算数据,然后根据不同的增加成本的算法逻辑,利用这些预算数据做计算。比如我们以新增广播成本的addBroadcastCost方法为示例,其实广播传输方式说白了就是将数据复制到当前运算符的所有输出通道中,因此这里对成本的计算取决于复制因子,代码如下:

预算提供者

前面我们谈论了如何通过CostEstimator来估算成本,但其实CostEstimator是在已获得预算数据的基础上应用相关的算法来算出成本的,而用来估算成本的预算数据其实是来自预算提供者(EstimateProvider)。Flink批处理中所有的运算符都有一个基于优化器的内部表示,我们可以称它们为优化器运算符,这些运算符创建于优化操作之前,且它们都必须实现EstimateProvider接口。各个优化器运算符根据自己的实现以及语义将成本估算相关的信息暴露给外部查询。目前被纳入预算的信息有:

  • 输出的数据流大小:由接口方法getEstimatedOutputSize提供;
  • 输出的记录数:由接口方法getEstimatedNumRecords提供;
  • 单个输出记录的平均字节数:由接口方法getEstimatedAvgWidthPerOutputRecord提供;

在dag包下,EstimateProvider接口的继承关系图如下:

Flink批处理优化器之成本估算

其中,OptimizerNode是所有被优化的运算符继承的基类,因此所有优化器运算符都是预算提供者。OptimizerNode为绝大部分的优化器运算符提供了统一的预算计算方法computeOutputEstimates。

为什么说是绝大部分运算符呢?因为有些运算符是特殊的,比如双输入端union运算符BinaryUnionNode以及迭代相关的运算符。

所有的运算符都会在优化时被遍历,Flink提供了一个编号及预算遍历器(IdAndEstimatesVisitor)来对所有运算符进行逐个遍历并计算预算,这一点体现在Optimizer的compile方法的下面这行代码中:

而在IdAndEstimatesVisitor的postVisit方法中即调用computeOutputEstimates方法来计算预算。下面,我们来分析一下预算是如何计算得出的,总得来说computeOutputEstimates的逻辑被分为两部分:

  • 各个具体的运算符计算它们特定的预算;
  • 根据编译提示(CompilerHints)覆盖原有的预算计算;

OptimizerNode将特定运算符的预算计算定义成名为computeOperatorSpecificDefaultEstimates的抽象方法开放给派生类根据自身的特定逻辑实现。然后,如果该运算符如果设置有CompilerHints的话,将会根据CompilerHints覆盖原有的预算结果。

所谓CompilerHints,它是封装了描述用户函数行为的编译提示,它可用于改进优化器对计划的选择。如果给某个运算符设置编译提示的话,那么在计算预算时,将会用它来覆盖运算符自身给出的中间结果的预算。当前,CompilerHints在优化器中没有得到太大的机会发挥。

因为CompilerHints没有被广泛应用,所以预算的计算还是依赖各个运算符具体提供,所以我们关注一下computeOperatorSpecificDefaultEstimates方法。该方法完全是按照具体运算符的语义特征来实现的,我们选择看其中的几个实现:

二元union运算符的预算就是累加其两个输入端:

Cross运算符的处理方式是:

从上面的两个运算符对预算的计算可见,它们大都依赖上游运算符的输出预算。而最初的预算肯定由source运算符决定,因为只有source才能知道数据的具体规模。

所以,我们来看一下DataSourceNode,很明显它作为数据的输入源,是最有可能了解初始数据集大小的运算符,为此Flink定义了一个专门用于统计的对象BaseStatistics,它用于统计对接外部的数据源的预算信息。但并非每个数据源的信息都能被统计到,而Flink当前也只实现了以文件为输入的FileInputFormat的预算统计FileBaseStatistics。


原文发布时间为:2017-03-28

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

上一篇:打印出txt中出现频率最高的十个词——软件工程个人项目C语言


下一篇:PyYAML中文文档