SparkSQL Catalyst解析

Catalyst Optimizer是SparkSQL的核心组件(查询优化器),它负责将SQL语句转换成物理执行计划,Catalyst的优劣决定了SQL执行的性能。

查询优化器是一个SQL引擎的核心,开源常用的有Apache Calcite(很多开源组件都通过引入Calcite来实现查询优化,如Hive/Phoenix/Drill等),另外一个是orca(HAWQ/GreenPlum中使用)。

关系代数是查询优化器的理论基础。常见的查询优化技术:查询重用(ReuseSubquery/ReuseExchange等)/RBO/CBO等。

SparkSQL执行流程

SparkSQL Catalyst解析

SparkSQL中对一条SQL语句的处理过程如上图所示:
1) SqlParser将SQL语句解析成一个逻辑执行计划(未解析)
2) Analyzer利用HiveMeta中表/列等信息,对逻辑执行计划进行解析(如表/列是否存在等)
3) SparkOptimizer利用Rule Based(基于经验规则RBO)/Cost Based(基于代价CBO)的优化方法,对逻辑执行计划进行优化(如谓词下推/JoinReorder)
4) SparkPlanner将逻辑执行计划转换成物理执行计划(如Filter -> FilterExec),
同时从某些逻辑算子的多种物理算子实现中根据RBO/CBO选择其中一个合适的物理算子(如Join的多个实现BroadcastJoin/SortMergeJoin/HashJoin中选择一个实现)
5) PrepareForExecution是执行物理执行计划之前做的一些事情,比如ReuseExchange/WholeStageCodegen的处理等等
6) 最终在SparkCore中执行该物理执行计划。

接下来介绍Catalyst中的核心模块SparkOptimizer/SparkPlanner.

SparkOptimizer

使用已有的规则对逻辑执行计划进行优化,该过程是基于经验/启发式的优化方法,得到优化过的逻辑执行计划。

SparkSQL Catalyst解析

如上图所示,Optimizer中有很多Batch,每个Batch中包含1个或多个Rule,Batch的另外一个属性是迭代次数(Once/FixPoint默认100次),每个Batch内部Rule有前后执行顺序,Batch之间也是按照顺序来执行的。目前Optimizer中有60多个Rule。
备注: 从Rule看JoinReorder在这个过程就已经处理了。

SparkPlanner

参考: https://issues.apache.org/jira/browse/SPARK-1251
SparkPlanner将逻辑执行计划转换成物理执行计划,即将逻辑执行计划树中的逻辑节点转换成物理节点,如Join转换成HashJoinExec/SortMergeJoinExec...,Filter转成FilterExec等

SparkSQL Catalyst解析

Spark的Stragety有8个:

  • DataSourceV2Strategy
  • FileSourceStrategy
  • DataSourceStrategy
  • SpecialLimits
  • Aggregation
  • JoinSelection
  • InMemoryScans
  • BasicOperators

上述很多Stragety都是基于规则的策略。
JoinSelection用到了相关的统计信息来选择将Join转换为BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,属于CBO基于代价的策略。

PrepareForExecution

在执行之前,对物理执行计划做一些处理,这些处理都是基于规则的,包括

  • PlanSubqueries
  • EnsureRequirements
  • CollapseCodegenStages
  • ReuseExchange
  • ReuseSubquery

经过上述步骤之后生成的最终物理执行计划提交到Spark执行。

CBO(基于代价)实现

CBO的实现有三个步骤如下,可以大致了解一下:

1. 统计信息采集

Optimizer/Planner中CBO(基于代价)的优化需要采集统计信息,包括表维度和列维度。

//包含表/列
case class Statistics(
    sizeInBytes: BigInt,
    rowCount: Option[BigInt] = None,
    attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
    hints: HintInfo = HintInfo())

//列
case class ColumnStat(
    distinctCount: BigInt,
    min: Option[Any],
    max: Option[Any],
    nullCount: BigInt,
    avgLen: Long,
    maxLen: Long,
    histogram: Option[Histogram] = None)

上面结构体用来存储统计信息,可以看出:
表维度: 大小/条数
列维度: NDV/min/max/Null/平均长度/最大长度/直方图

上述信息需要提前使用Analyze命令进行采集

// 采集表维度的统计信息,NOSCAN表示不扫描表(即只有表大小信息,不采集表条数信息)
ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [NOSCAN];

// 采集列信息
// 若spark.sql.statistics.histogram.enabled设置为true,则会采集直方图信息
// 采集直方图信息需要额外一次的表扫描
// 使用的是等高直方图
// 只支持IntegralType/DoubleType/DecimalType/FloatType/DateType/TimestampType的列采集直方图
ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;

2.估算算子统计信息

逻辑执行计划树中只有叶子节点(表)有实际的统计信息(通过Analyze获取), 逻辑执行计划树中非叶子节点会根据子节点信息以及估算方法获取本节点的统计信息。

/**
 * Returns the estimated statistics for the current logical plan node. Under the hood, this
 * method caches the return value, which is computed based on the configuration passed in the
 * first time. If the configuration changes, the cache can be invalidated by calling
 * [[invalidateStatsCache()]].
 */
def stats: Statistics = statsCache.getOrElse {
  if (conf.cboEnabled) {
    statsCache = Option(BasicStatsPlanVisitor.visit(self))
  } else {
    statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
  }
  statsCache.get
}

def visit(p: LogicalPlan): T = p match {
    case p: Aggregate => visitAggregate(p)
    case p: Distinct => visitDistinct(p)
    case p: Except => visitExcept(p)
    case p: Expand => visitExpand(p)
    case p: Filter => visitFilter(p)
    case p: Generate => visitGenerate(p)
    case p: GlobalLimit => visitGlobalLimit(p)
    case p: Intersect => visitIntersect(p)
    case p: Join => visitJoin(p)
    case p: LocalLimit => visitLocalLimit(p)
    case p: Pivot => visitPivot(p)
    case p: Project => visitProject(p)
    case p: Repartition => visitRepartition(p)
    case p: RepartitionByExpression => visitRepartitionByExpr(p)
    case p: ResolvedHint => visitHint(p)
    case p: Sample => visitSample(p)
    case p: ScriptTransformation => visitScriptTransform(p)
    case p: Union => visitUnion(p)
    case p: Window => visitWindow(p)
    case p: LogicalPlan => default(p)
  }

每个算子都有自己的预估方法
CBO打开/关闭,有些算子的预估方法不一样,如AggregateEstimation/FilterEstimation/JoinEstimation/ProjectEstimation,其它算子CBO打开/关闭使用一套预估方法。

3.基于统计信息的优化

统计信息越准确,基于统计信息的优化更准确,从目前代码看只有下面三种场景使用到了统计信息。

JoinReorder

动态规划

//代价函数
//weight可以通过参数控制spark.sql.cbo.joinReorder.card.weight,默认0.7
//根据行数/大小来计算代价
cost = rows * weight + size * (1 - weight)

// 比较两种Join的代价大小
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
      if (other.planCost.card == 0 || other.planCost.size == 0) {
        false
      } else {
        val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
        val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
        relativeRows * conf.joinReorderCardWeight +
          relativeSize * (1 - conf.joinReorderCardWeight) < 1
      }
}
JoinSelection

根据Join两个子节点的统计信息,判断使用BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,比如其中一个表(size)很小则可以使用BroadcastHashJoinExec。

StarSchemaDetection

探测星型模型,判断一个列是否是表的主键(因为SparkSQL不支持主键设置)

/**
 * Determines if a column referenced by a base table access is a primary key.
 * A column is a PK if it is not nullable and has unique values.
 * To determine if a column has unique values in the absence of informational
 * RI constraints, the number of distinct values is compared to the total
 * number of rows in the table. If their relative difference
 * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based
 * on TPC-DS data results), the column is assumed to have unique values.
 */
  private def isUnique(
      column: Attribute,
      plan: LogicalPlan): Boolean = plan match {
    case PhysicalOperation(_, _, t: LeafNode) =>
      val leafCol = findLeafNodeCol(column, plan)
      leafCol match {
        case Some(col) if t.outputSet.contains(col) =>
          val stats = t.stats
          stats.rowCount match {
            case Some(rowCount) if rowCount >= 0 =>
              if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
                val colStats = stats.attributeStats.get(col)
                if (colStats.get.nullCount > 0) {
                  false
                } else {
                  val distinctCount = colStats.get.distinctCount
                  val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
                  // ndvMaxErr adjusted based on TPCDS 1TB data results
                  relDiff <= conf.ndvMaxError * 2
                }
              } else {
                false
              }
            case None => false
          }
        case None => false
      }
    case _ => false
  }

SparkSQL Catalyst解析

上一篇:深入分析Win32k系统调用过滤机制


下一篇:初学Spark