Spark SQL组件源码分析

功能

Spark新发布的Spark SQL组件让Spark对SQL有了别样于Shark基于Hive的支持。参考官方手册,具体分三部分:

其一,能在Scala代码里写SQL,支持简单的SQL语法检查,能把RDD指定为Table存储起来。此外支持部分SQL语法的DSL。

其二,支持Parquet文件的读写,且保留Schema。

其三,能在Scala代码里访问Hive元数据,能执行Hive语句,并且把结果取回作为RDD使用。


第一点对SQL的支持主要依赖了Catalyst这个新的查询优化框架(下面会给出一些Catalyst的简介),在把sql解析成逻辑执行计划之后,利用Catalyst包里的一些类和接口,执行了一些简单的执行计划优化,最后变成RDD的计算。虽然目前的sql parser比较简单,执行计划的优化比较通配,还有些参考价值,所以看了下这块代码。目前这个PR在昨天已经merge进了主干,可以在sql模块里看到这部分实现,还有catalyst模块看到Catalyst的代码。下面会具体介绍Spark SQL模块的实现。

第二点对Parquet的支持不关注,因为我们的应用场景里不会使用Parquet这样的列存储,适用场景不一样。

第三点对Hive的这种结合方式,没有什么核心的进展。与Shark相比,Shark依赖Hive的Metastore,解析器等能把hql执行变成Spark上的计算,而Hive的现在这种结合方式与代码里引入Hive包执行hql没什么本质区别,只是把hive hql的数据与RDD的打通这种交互做得更友好了。


Catalyst介绍

参考spark summit里关于Catalyst的资料,Catalyst: A Query Optimization Framework for Spark and Shark

Query optimization can greatly improve both the productivity of developers and the performance of the queries that they write. A good query optimizer is capable of automatically rewriting relational queries to execute more efficiently, using techniques such as filtering data early, utilizing available indexes, and even ensuring different data sources are joined in the most efficient order. By performing these transformations, the optimizer not only improves the execution times of relational queries, but also frees the developer to focus on the semantics of their application instead of its performance. Unfortunately, building an optimizer is a incredibly complex engineering task and thus many open source systems perform only very simple optimizations. Past research[1][2] has attempted to combat this complexity by providing frameworks that allow the creators of optimizers to write possible optimizations as a set of declarative rules. However, the use of such frameworks has required the creation and maintenance of special “optimizer compilers” and forced the burden of learning a complex domain specific language upon those wishing to add features to the optimizer. Instead, we propose Catalyst, a query optimization framework embedded in Scala.Catalyst takes advantage of Scala’s powerful language features such as pattern matching and runtime metaprogramming to allow developers to concisely specify complex relational optimizations.In this talk I will describe the framework and how it allows developers toexpress complex query transformations in very few lines of code. I will also describe our initialefforts at improving the execution time of Shark queries by greatly improving its query optimization capabilities.

总体上说,Catalyst是一个 implementation-agnostic framework for manipulating trees of relational operators and expressions.主要由三部分组成:

  1. a TreeNode library for transforming trees that are expressed as Scala case classes, 
  2. a logical plan representation for relational operators, 
  3. an expression library.


分析

根据官方给出的Spark SQL例子,分析一下内部实现。

val sc: SparkContext // An existing SparkContext.
val sqlContext = new SqlContext(sc)

// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._

// Define the schema using a case class.
case class Person(name: String, age: String)

// Create an RDD of Person objects and register it as a table.
val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt))
people.registerAsTable("people")

val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")

// The results of SQL queries are themselves RDDs and support all the normal operations
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

SQLContext是SQL模块一个总的执行环境,

val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) 原本是通过sc的文件读取和两次MappedRDD transform生成一个people RDD,由于import sqlContext._ 之后,把隐式函数createSchemaRDD引进了上下文中,

implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
    new SchemaRDD( this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
被声明为 RDD[Person]的people是一个带case class的RDD,所以被转换成了SchemaRDD。


SchemaRDD是SQL模块增加的一个RDD实现类,SchemaRDD在new的时候需要两部分

class SchemaRDD(
    @transient val sqlContext: SQLContext,
    @transient val logicalPlan: LogicalPlan)
    extends RDD[Row](sqlContext.sparkContext, Nil) {
SQLContext和逻辑执行计划。在createSchemaRDD方法中,ExsitingRdd.fromProductRdd(rdd)对people这个rdd做了这一件事情:

def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
    ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
  }

首先根据A,即Person这个case class,通过Scala反射出了类的属性,对于table来说就是取到了各个column。其次,productToRowRdd把rdd转化成了一个RDD[Row],

def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
    // TODO: Reuse the row, don‘t use map on the product iterator.  Maybe code gen?
    data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)
  }
这两步之后,其实就是基于people这个RDD,得到一个case class:

case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row])
output对应的就是column的序列。


SparkLogicalPlan是一个简单的类,可以看到内部实现

/**
 * Allows already planned SparkQueries to be linked into logical query plans.
 *
 * Note that in general it is not valid to use this class to link multiple copies of the same
 * physical operator into the same query plan as this violates the uniqueness of expression ids.
 * Special handling exists for ExistingRdd as these are already leaf operators and thus we can just
 * replace the output attributes with new copies of themselves without breaking any attribute
 * linking.
 */
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
  extends logical.LogicalPlan with MultiInstanceRelation {

  def output = alreadyPlanned.output
  def references = Set.empty
  def children = Nil

  override final def newInstance: this.type = {
    SparkLogicalPlan(
      alreadyPlanned match {
        case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
        case _ => sys.error("Multiple instance of the same relation detected.")
      }).asInstanceOf[this.type]
  }
}

到这里为止,其实都是val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) 这一步发生的事情,结果就是得到了一个SchemaRDD。

接下来,people.registerAsTable("people") 是SchemaRDD的一个方法,可以看到真正注册table表依赖的还是SQLContext

def registerAsTable(tableName: String): Unit = {
    sqlContext.registerRDDAsTable(this, tableName)
  }


SQLContext的registerRDDAdTable方法如下,这种表的注册由于信息是维护在内存里的(下面可以看到目前是一个hashmap维护),所以存活时间只是SQLContext所存在的lifetime内。

/**
   * Registers the given RDD as a temporary table in the catalog.  Temporary tables exist only
   * during the lifetime of this instance of SQLContext.
   *
   * @group userf
   */
  def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
    catalog.registerTable(None, tableName, rdd.logicalPlan)
  }
SQLContext在注册表的时候,依赖的是Catalog类的一个实现 SimpleCatalog类:

/**
 * An interface for looking up relations by name.  Used by an [[Analyzer]].
 */
trait Catalog {
  def lookupRelation(
    databaseName: Option[String],
    tableName: String,
    alias: Option[String] = None): LogicalPlan

  def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
}

class SimpleCatalog extends Catalog {
  val tables = new mutable.HashMap[String, LogicalPlan]()

  def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
    tables += ((tableName, plan))
  }

  def dropTable(tableName: String) = tables -= tableName

  def lookupRelation(
      databaseName: Option[String],
      tableName: String,
      alias: Option[String] = None): LogicalPlan = {
    val table = tables.get(tableName).getOrElse(sys.error(s"Table Not Found: $tableName"))

    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
    // properly qualified with this alias.
    alias.map(a => Subquery(a.toLowerCase, table)).getOrElse(table)
  }
}
上面这部分是Catalyst包里的代码,Catalog是一个维护table信息的类,能把注册新表存储起来,对旧表能进行查询和删除。SimpleCatalog的实现则是把tableName和logicalPlan存在了一个hashmap里。


好了,在转换好RDD并存储成table之后,接下来是执行sql的时候了:val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")

sql方法同样是SQLContext的方法:

 /**
   * Executes a SQL query using Spark, returning the result as a SchemaRDD.
   *
   * @group userf
   */
  def sql(sqlText: String): SchemaRDD = {
    val result = new SchemaRDD(this, parseSql(sqlText))
    // We force query optimization to happen right away instead of letting it happen lazily like
    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
    // generates the RDD lineage for DML queries, but do not perform any execution.
    result.queryExecution.toRdd
    result
  }
先略过生成result的过程,看到最后在返回result之前,做了一次result.queryExecution.toRdd的操作。在执行toRdd之前,前面rdd的转换、逻辑执行计划的生成、分析、优化工作都还没有实际进行数据的计算,直到toRdd了之后,这一系列的plan才真正执行,目前sql()的实现里面把计算完成了。

parseSql(sqlText)这一步是使用一个简单的sql parser解析了一下sql,这个scala parser是Catalyst包里提供的一个SqlParser,源码注释是这么说的:

/**
 * A very simple SQL parser.  Based loosly on:
 * https://github.com/stephentu/scala-sql-parser/blob/master/src/main/scala/parser.scala
 *
 * Limitations:
 *  - Only supports a very limited subset of SQL.
 *  - Keywords must be capital.
 *
 * This is currently included mostly for illustrative purposes.  Users wanting more complete support
 * for a SQL like language should checkout the HiveQL support in the sql/hive subproject.
 */
class SqlParser extends StandardTokenParsers
貌似参考了人家实现的一个scala sql parser,支持很少量的sql语法,关键字还需要大写。看实现里目前可能支持下面一些关键字:

  protected val ALL = Keyword("ALL")
  protected val AND = Keyword("AND")
  protected val AS = Keyword("AS")
  protected val ASC = Keyword("ASC")
  protected val AVG = Keyword("AVG")
  protected val BY = Keyword("BY")
  protected val CAST = Keyword("CAST")
  protected val COUNT = Keyword("COUNT")
  protected val DESC = Keyword("DESC")
  protected val DISTINCT = Keyword("DISTINCT")
  protected val FALSE = Keyword("FALSE")
  protected val FIRST = Keyword("FIRST")
  protected val FROM = Keyword("FROM")
  protected val FULL = Keyword("FULL")
  protected val GROUP = Keyword("GROUP")
  protected val HAVING = Keyword("HAVING")
  protected val IF = Keyword("IF")
  protected val IN = Keyword("IN")
  protected val INNER = Keyword("INNER")
  protected val IS = Keyword("IS")
  protected val JOIN = Keyword("JOIN")
  protected val LEFT = Keyword("LEFT")
  protected val LIMIT = Keyword("LIMIT")
  protected val NOT = Keyword("NOT")
  protected val NULL = Keyword("NULL")
  protected val ON = Keyword("ON")
  protected val OR = Keyword("OR")
  protected val ORDER = Keyword("ORDER")
  protected val OUTER = Keyword("OUTER")
  protected val RIGHT = Keyword("RIGHT")
  protected val SELECT = Keyword("SELECT")
  protected val STRING = Keyword("STRING")
  protected val SUM = Keyword("SUM")
  protected val TRUE = Keyword("TRUE")
  protected val UNION = Keyword("UNION")
  protected val WHERE = Keyword("WHERE")


好了,现在到了最高潮的部分了,sql()方法里result.queryExecution这一句是最重要的,queryExecution真正调用了SQLContext对逻辑执行计划的处理,

/**
   * A lazily computed query execution workflow.  All other RDD operations are passed
   * through to the RDD that is produced by this workflow.
   *
   * We want this to be lazy because invoking the whole query optimization pipeline can be
   * expensive.
   */
  @transient
  protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
处理方式是生成一个QueryExecution,

protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }
而QueryExecution里声明了几个lazy变量,直到toRdd被调用的时候,所有的分析、优化等处理都会触发生效,让我们看一下QueryExecution部分的代码

/**
   * The primary workflow for executing relational queries using Spark.  Designed to allow easy
   * access to the intermediate phases of query execution for developers.
   */
  protected abstract class QueryExecution {
    def logical: LogicalPlan

    lazy val analyzed = analyzer(logical)
    lazy val optimizedPlan = optimizer(analyzed)
    // TODO: Don‘t just pick the first one...
    lazy val sparkPlan = planner(optimizedPlan).next()
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()

主要分为四步,analyzer -> optimizer -> planner -> prepareForExecution


在讲述这四步之前,简单提一下LogicalPlan,因为在转化成物理执行前,本文很多类和接口的处理对象都是逻辑执行计划,即LogicalPlan类。LogicalPlan本身是一个抽象类,他的实现有大致以下这些,他是Catalyst代码里的一个重要类,总体上是一棵语法树结构:

/**
 * A logical plan node with no children.
 */
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan ] {
  self: Product =>

  // Leaf nodes by definition cannot reference any input attributes.
  def references = Set.empty
}

/**
 * A logical node that represents a non-query command to be executed by the system.  For example,
 * commands can be used by parsers to represent DDL operations.
 */
abstract class Command extends LeafNode {
  self: Product =>
  def output = Seq.empty
}

/**
 * Returned for commands supported by a given parser, but not catalyst.  In general these are DDL
 * commands that are passed directly to another system.
 */
case class NativeCommand(cmd: String) extends Command

/**
 * Returned by a parser when the users only wants to see what query plan would be executed, without
 * actually performing the execution.
 */
case class ExplainCommand(plan: LogicalPlan ) extends Command

/**
 * A logical plan node with single child.
 */
abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan ] {
  self: Product =>
}

/**
 * A logical plan node with a left and right child.
 */
abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan ] {
  self: Product =>
}

关于Catalyst QueryPlan的继承结构如下图,其中SparkPlan是SQL包里的一个实现,前面在new SparkLogicalPlan的时候传入的就是一个SparkPlan。

Spark SQL组件源码分析


第一步:analyzer

SQLContext有一个分析器,来自Catalyst包,里面的catalog就是存储了table信息的那货,

@transient
  protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)
从分析器的实现看,分析器是为了把不确定的属性和关系,通过catalog新和一些适配器方法通通确定选来,

/**
 * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
 * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
 * a [[FunctionRegistry]].
 */
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

  // TODO: pass this in as a parameter.
  val fixedPoint = FixedPoint(100)

  val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*)
  )
分析器的batches注册了很多策略,或者说是处理适配器,在内部为每个都做了实现,比如:

/**
   * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
   */
  object ResolveRelations extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case UnresolvedRelation(databaseName, name, alias) =>
        catalog.lookupRelation(databaseName, name, alias)
    }
  }
由于Analyzer是继承了RuleExecutor,它的每种处理方法则是继承了Rule。在RuleExecutor里,apply()方法会递归逻辑执行计划,执行batches里的处理适配器,

/**
   * Executes the batches of rules defined by the subclass. The batches are executed serially
   * using the defined execution strategy. Within each batch, rules are also executed serially.
   */
  def apply(plan: TreeType): TreeType = {
    var curPlan = plan

    batches.foreach { batch =>
      var iteration = 1
      var lastPlan = curPlan
      curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) }

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) {
        lastPlan = curPlan
        curPlan = batch.rules.foldLeft(curPlan) {
          case (curPlan, rule) =>
            val result = rule(curPlan)
            if (!result.fastEquals(curPlan)) {
              logger.debug(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(curPlan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }

            result
        }
        iteration += 1
      }
    }

    curPlan
  }


第二步:optimizer

优化器的实现和处理方式同分析器很相似,只是出于不同的处理阶段,他们职责不同。优化器也继承了RuleExecutor,并实现了一批规则,这批规则会同分析器一样对输入的plan进行递归处理:

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Subqueries", Once,
      EliminateSubqueries) ::
    Batch("ConstantFolding", Once,
      ConstantFolding,
      BooleanSimplification,
      SimplifyCasts) ::
    Batch("Filter Pushdown", Once,
      EliminateSubqueries,
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughInnerJoin) :: Nil
}


第三步:planner

SQLContext内部的planner是一个自己实现的SparkPlanner,

protected[sql] class SparkPlanner extends SparkStrategies {
    val sparkContext = self.sparkContext

    val strategies: Seq[Strategy] =
      TopK ::
      PartialAggregation ::
      SparkEquiInnerJoin ::
      BasicOperators ::
      CartesianProduct ::
      BroadcastNestedLoopJoin :: Nil
  }

里面包含了一个策略序列,而SparkStrategies是Catalyst包里的QueryPlanner的一个子类实现,QueryPlanner的apply()方法实现如下,

def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...
    val iter = strategies.view.flatMap(_(plan)).toIterator
    assert(iter.hasNext, s"No plan for $plan")
    iter
  }
对于SparkPlanner来说,就是把TopK, BasicOperator这些策略对物理执行计划进行一次遍历贯彻。


第四步:prepareForExecution

prepareForExecution依然是Catalyst RuleExecutor的一个实现,

/**
   * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
   * inserting shuffle operations as needed.
   */
  @transient
  protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
    val batches =
      Batch("Add exchange", Once, AddExchange) ::
      Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
  
里面注册了两个batch处理。


上述这些步骤其实都是对Catalyst RuleExecutor的实现,只是每一步分工不同,实现的事情也不一样,只有在最后 调用

/** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()
这一步的时候,所有之前的lazy 计划被触发执行。我们再回顾一下QueryExecution代码里的几个过程:

/**
   * The primary workflow for executing relational queries using Spark.  Designed to allow easy
   * access to the intermediate phases of query execution for developers.
   */
  protected abstract class QueryExecution {
    def logical: LogicalPlan

    lazy val analyzed = analyzer(logical)
    lazy val optimizedPlan = optimizer(analyzed)
    // TODO: Don‘t just pick the first one...
    lazy val sparkPlan = planner(optimizedPlan).next()
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()


最后,sql()执行完之后返回的是一个RDD,所以之后又可以使用RDD的transform和acrtion做别的处理。

这边对analyzer, optimizer的分析略显简陋,主要是一些规则的定制,我也还没有完全仔细地去看,希望可以梳理清楚就好。


总结

总结一下Spark SQL的过程:

通过SQLContext这个新的上下文类,我们可以把RDD注册成table,这个RDD是SchemaRDD,拥有一个case class,相当于是sql表的schema信息,schema的column信息会从case class里反射出来。把RDD注册成table之后,他的信息会持有在Catalog里,且生命周期是本次上下文内存里。之后在进行sql()编写的时候,就可以利用到这个SchemaRDD,执行之前会经历几个步骤,分别是通过简单的sql parser把sql解析成逻辑执行计划,从逻辑执行计划到物理执行计划之间,有分析器、优化器和Planner做进一步的处理,这些处理本质上都是Catalyst RuleExecutor的实现,每一步骤都定制和注册了自己的规则序列,递归作用于逻辑执行计划之内。最后前面这些处理还都是lazy 的,只有触发toRdd的时候才真正执行,返回RDD,此RDD既是Spark上通用的RDD形态,可以被继续处理,从而打通了从RDD到table,经过sql处理后再回到RDD的过程。整个过程的执行和优化完全依靠的是Catalyst这个新的查询优化框架。


本文主要关注的是Spark SQL组件的sql这一块,对于其他hive和Parquet的支持不做分析。后续会再阅读Catalyst包的代码,看看这一块内容有没有一些可以参考的点。感觉Spark core包的这些新的工具组件,一个接一个地出现,我们使用的时候还是具备一些选择性,本身也不清楚哪个会是Databricks主要发展的方向,可能他们自己也是各个方面都突一突,像Catalyst这个东西貌似还会渗透到Shark里去为Shark做改良,而Spark 1.0里对于Spark SQL这块应该会是一个看点,但估计api什么的还会有很大变化。反正咱自己也看看学学,能参考的就参考参考,能学习的就学习学习。


全文完 :)

Spark SQL组件源码分析,布布扣,bubuko.com

Spark SQL组件源码分析

上一篇:Photoshop简单制作冰的思路


下一篇:Photoshop打造女人们喜爱的水晶项链