spark2 sql原理分析--逻辑计划转换成物理计划的实现分析(SparkPlanner)

概述

本文介绍介绍SparkPlanner的实现原理。
SparkPlanner将优化后的逻辑执行计划转换为物理执行计划的计划器(Planner)。

SparkPlanner是一个具体的Catalyst Query Planner,它使用执行计划策略( execution planning strategies)将逻辑计划转换为一个或多个物理计划,并支持额外的策略(ExperimentalMethods)和extraPlanningStrategies。

SparkPlanner类的实现

SparkPlanner继承了几个抽象类,它和几个抽象类的关系如下:

SparkPlanner -> SparkStrategies(抽象类) -> QueryPlanner(抽象类) ->

QueryExecution的实现过程都是在构造其对象时完成的。创建QueryExecution对象时首先要获取一个SparkPlanner对象。代码如下:

  protected def planner = sparkSession.sessionState.planner

SparkPlanner对象对象用来把逻辑执行计划转换成物理执行计划,并对不好的物理计划进行修正。下面分析一下该对象的实现原理:

SparkPlanner的原理

从以上的分析可以知:SparkPlanner会把一个逻辑执行计划转换成一个或多个物理执行计划。

那么,它是如何把一个逻辑执行计划转换成多个物理执行计划的呢?其实SparkPlanner是根据一个或多个SparkStrategy来进行转换的。

SparkPlanner根据逻辑执行计划来判断具体使用哪种SparkStrategy,比如:FileSourceStrategy或DataSourceStrategy(conf)等等。

SparkStrategy可以分为两类:一类是规则的SparkStrategy,另一类是额外的SparkStrategy。

SparkPlanner的定义

  • 类的继承关系
SparkPlanner -> SparkStrategies(抽象类) -> QueryPlanner(抽象类)
  • SparkPlanner类声明
class SparkPlanner(
    val sparkContext: SparkContext,
    val conf: SQLConf,
    val experimentalMethods: ExperimentalMethods)
  extends SparkStrategies {
  • SparkStrategies类声明
abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
  • QueryPlanner类声明
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {

在QueryPlanner类中定义了一个plan函数,通过该plan函数,可以将逻辑执行计划装换成物理执行计划。

SparkPlanner类的实现概要

  1. 首先通过配置得到shuffle的分区数,配置项是:spark.sql.shuffle.partitions,默认值是200。

  2. 设定从逻辑执行计划到物理执行计划的转换策略(SparkStrategy),这些策略形成一个策略的列表,保存到变量:strategies中。这个策略的列表,其实包括三个类别:

    • experimentalMethods的extraStrategies,在2中为空
    • extraPlanningStrategies策略,该策略列表会在构建sessionstate时创建。
    • 一个是:常规(GenericStrategy)策略,主要包括以下策略:
    PythonEvals ::
      DataSourceV2Strategy ::
      FileSourceStrategy ::
      DataSourceStrategy(conf) ::
      SpecialLimits ::
      Aggregation ::
      Window ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)
    

    以上这些策略会继承抽象类SparkStrategy,分别对应不同的操作,比如:join,limit等。

  3. 定义了一个prunePlans函数,通过该函数来防止计划组合的爆炸,需要修剪坏的计划。

  4. 提供了一个pruneFilterProject函数。该函数用来构建表扫描运算符,其中使用单独的物理运算符完成复杂映射和过滤。它会仅在需要时添加Project和Filter节点。

SparkPlanner的父类:SparkStrategies

SparkStrategies是一个抽象类,在该抽象类中,定义了各种把某个逻辑执行计划转换成物理执行计划(SparkPlans)的策略。这些策略都在类SparkStrategies中进行定义。

该类主要包括以下几种类型策略到物理执行计划的转换:

  • limit操作:在SpecialLimits类中实现。
  • join操作:在类JoinSelection中实现。在进行join策略选择时,对于不同的join类型有一定的限制,具体的可以查看JoinSelection类的实现。

根据联接键和逻辑计划的大小选择合适的物理计划进行联接。
首先,使用[[ExtractEquiJoinKeys]]模式查找可以通过匹配联接键来评估至少某些谓词的联接。 如果找到,则按以下优先级选择连接实现:

  • 流的聚合操作:StatefulAggregationStrategy
  • 流的去重操作:StreamingDeduplicationStrategy
  • 在流的append模式下的limit操作:StreamingGlobalLimitStrategy
  • 基于AggregateFunction2接口的聚合操作:Aggregation
  • EvalPython操作:PythonEvals
  • 基本的操作(比如:sample, count, union,reparttion等):BasicOperators

抽象类:QueryPlanner

该类是策略的顶层父类,该类的声明如下:

abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] 

该类提供了一些操作接口,用来把逻辑执行计划转换成物理执行计划。该类的核心接口是plan,我们一起来看一下该函数的实现逻辑:

  1. 搜集参数:在plan(逻辑执行计划)中收集物理执行计划的候选者。
  2. 需要转换的候选者可能包含占位符,需要使用他们的儿子节点替换该占位符。
  3. 最后,通过函数prunePlans来修正不好的物理执行计划,从而避免执行计划爆炸。该函数在prunePlans类中实现,但在spark 2中该函数实际上什么也没做。

具体的实现代码如下:


  def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...

    // Collect physical plan candidates.
    // 收集物理执行计划候选者
    val candidates = strategies.iterator.flatMap(_(plan))

    // The candidates may contain placeholders marked as [[planLater]],
    // so try to replace them by their child plans.
    // 若候选者中含有占位符,则使用儿子计划来替换它
    val plans = candidates.flatMap { candidate =>
      val placeholders = collectPlaceholders(candidate)

      if (placeholders.isEmpty) { // 不包括占位符,直接返回
        // Take the candidate as is because it does not contain placeholders.
        Iterator(candidate)
      } else {  // 在逻辑执行计划中标记占位符,并替换该占位符
        // Plan the logical plan marked as [[planLater]] and replace the placeholders.
        placeholders.iterator.foldLeft(Iterator(candidate)) {
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
            // Plan the logical plan for the placeholder.
            val childPlans = this.plan(logicalPlan)

            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
              childPlans.map { childPlan =>
                // Replace the placeholder by the child plan // 使用儿子plan替换占位符
                candidateWithPlaceholders.transformUp {
                  case p if p.eq(placeholder) => childPlan
                }
              }
            }
        }
      }
    }
    
    // 修正不好的执行计划
    val pruned = prunePlans(plans)
    assert(pruned.hasNext, s"No plan for $plan")
    pruned
  }

由于SparkStrategies类中的物理执行计划转换策略非常多,每个策略都可以进一步分析。就不在这篇文章中描述了。

到此,物理执行计划就已经初步生成了,但这些物理执行计划并非都是最优的,所以,还需要在后面进行优化,也就是在prepareForExecution中进行优化,才能进行执行代码的生成。

总结

本文讲述了spark sql中是如何把逻辑执行计划转换成物理执行计划的。通过SparkStrategies类中提供的策略,可以把逻辑执行计划转换成物理执行计划。

spark2 sql原理分析--逻辑计划转换成物理计划的实现分析(SparkPlanner)spark2 sql原理分析--逻辑计划转换成物理计划的实现分析(SparkPlanner) HoverZheng 发布了218 篇原创文章 · 获赞 38 · 访问量 58万+ 他的留言板 关注
上一篇:《PG源码学习--4.查询规划》


下一篇:腾讯云自媒体分享计划