概述
本文介绍介绍SparkPlanner的实现原理。
SparkPlanner将优化后的逻辑执行计划转换为物理执行计划的计划器(Planner)。
SparkPlanner是一个具体的Catalyst Query Planner,它使用执行计划策略( execution planning strategies)将逻辑计划转换为一个或多个物理计划,并支持额外的策略(ExperimentalMethods)和extraPlanningStrategies。
SparkPlanner类的实现
SparkPlanner继承了几个抽象类,它和几个抽象类的关系如下:
SparkPlanner -> SparkStrategies(抽象类) -> QueryPlanner(抽象类) ->
QueryExecution的实现过程都是在构造其对象时完成的。创建QueryExecution对象时首先要获取一个SparkPlanner对象。代码如下:
protected def planner = sparkSession.sessionState.planner
SparkPlanner对象对象用来把逻辑执行计划转换成物理执行计划,并对不好的物理计划进行修正。下面分析一下该对象的实现原理:
SparkPlanner的原理
从以上的分析可以知:SparkPlanner会把一个逻辑执行计划转换成一个或多个物理执行计划。
那么,它是如何把一个逻辑执行计划转换成多个物理执行计划的呢?其实SparkPlanner是根据一个或多个SparkStrategy来进行转换的。
SparkPlanner根据逻辑执行计划来判断具体使用哪种SparkStrategy,比如:FileSourceStrategy或DataSourceStrategy(conf)等等。
SparkStrategy可以分为两类:一类是规则的SparkStrategy,另一类是额外的SparkStrategy。
SparkPlanner的定义
- 类的继承关系
SparkPlanner -> SparkStrategies(抽象类) -> QueryPlanner(抽象类)
- SparkPlanner类声明
class SparkPlanner(
val sparkContext: SparkContext,
val conf: SQLConf,
val experimentalMethods: ExperimentalMethods)
extends SparkStrategies {
- SparkStrategies类声明
abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
- QueryPlanner类声明
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
在QueryPlanner类中定义了一个plan函数,通过该plan函数,可以将逻辑执行计划装换成物理执行计划。
SparkPlanner类的实现概要
-
首先通过配置得到shuffle的分区数,配置项是:spark.sql.shuffle.partitions,默认值是200。
-
设定从逻辑执行计划到物理执行计划的转换策略(SparkStrategy),这些策略形成一个策略的列表,保存到变量:strategies中。这个策略的列表,其实包括三个类别:
- experimentalMethods的extraStrategies,在2中为空
- extraPlanningStrategies策略,该策略列表会在构建sessionstate时创建。
- 一个是:常规(GenericStrategy)策略,主要包括以下策略:
PythonEvals :: DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: Aggregation :: Window :: JoinSelection :: InMemoryScans :: BasicOperators :: Nil)
以上这些策略会继承抽象类SparkStrategy,分别对应不同的操作,比如:join,limit等。
-
定义了一个prunePlans函数,通过该函数来防止计划组合的爆炸,需要修剪坏的计划。
-
提供了一个pruneFilterProject函数。该函数用来构建表扫描运算符,其中使用单独的物理运算符完成复杂映射和过滤。它会仅在需要时添加Project和Filter节点。
SparkPlanner的父类:SparkStrategies
SparkStrategies是一个抽象类,在该抽象类中,定义了各种把某个逻辑执行计划转换成物理执行计划(SparkPlans)的策略。这些策略都在类SparkStrategies中进行定义。
该类主要包括以下几种类型策略到物理执行计划的转换:
- limit操作:在SpecialLimits类中实现。
- join操作:在类JoinSelection中实现。在进行join策略选择时,对于不同的join类型有一定的限制,具体的可以查看JoinSelection类的实现。
根据联接键和逻辑计划的大小选择合适的物理计划进行联接。
首先,使用[[ExtractEquiJoinKeys]]模式查找可以通过匹配联接键来评估至少某些谓词的联接。 如果找到,则按以下优先级选择连接实现:
- 流的聚合操作:StatefulAggregationStrategy
- 流的去重操作:StreamingDeduplicationStrategy
- 在流的append模式下的limit操作:StreamingGlobalLimitStrategy
- 基于AggregateFunction2接口的聚合操作:Aggregation
- EvalPython操作:PythonEvals
- 基本的操作(比如:sample, count, union,reparttion等):BasicOperators
抽象类:QueryPlanner
该类是策略的顶层父类,该类的声明如下:
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]]
该类提供了一些操作接口,用来把逻辑执行计划转换成物理执行计划。该类的核心接口是plan,我们一起来看一下该函数的实现逻辑:
- 搜集参数:在plan(逻辑执行计划)中收集物理执行计划的候选者。
- 需要转换的候选者可能包含占位符,需要使用他们的儿子节点替换该占位符。
- 最后,通过函数prunePlans来修正不好的物理执行计划,从而避免执行计划爆炸。该函数在prunePlans类中实现,但在spark 2中该函数实际上什么也没做。
具体的实现代码如下:
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
// Collect physical plan candidates.
// 收集物理执行计划候选者
val candidates = strategies.iterator.flatMap(_(plan))
// The candidates may contain placeholders marked as [[planLater]],
// so try to replace them by their child plans.
// 若候选者中含有占位符,则使用儿子计划来替换它
val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate)
if (placeholders.isEmpty) { // 不包括占位符,直接返回
// Take the candidate as is because it does not contain placeholders.
Iterator(candidate)
} else { // 在逻辑执行计划中标记占位符,并替换该占位符
// Plan the logical plan marked as [[planLater]] and replace the placeholders.
placeholders.iterator.foldLeft(Iterator(candidate)) {
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder.
val childPlans = this.plan(logicalPlan)
candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan =>
// Replace the placeholder by the child plan // 使用儿子plan替换占位符
candidateWithPlaceholders.transformUp {
case p if p.eq(placeholder) => childPlan
}
}
}
}
}
}
// 修正不好的执行计划
val pruned = prunePlans(plans)
assert(pruned.hasNext, s"No plan for $plan")
pruned
}
由于SparkStrategies类中的物理执行计划转换策略非常多,每个策略都可以进一步分析。就不在这篇文章中描述了。
到此,物理执行计划就已经初步生成了,但这些物理执行计划并非都是最优的,所以,还需要在后面进行优化,也就是在prepareForExecution中进行优化,才能进行执行代码的生成。
总结
本文讲述了spark sql中是如何把逻辑执行计划转换成物理执行计划的。通过SparkStrategies类中提供的策略,可以把逻辑执行计划转换成物理执行计划。
HoverZheng 发布了218 篇原创文章 · 获赞 38 · 访问量 58万+ 关注