Spark SQL介绍
Spark SQL 是Spark中技术最复杂的的组件之一,它提供了在Spark程序中对结构化数据进行操作的功能,即SQL查询。具体来说,Spark SQL 有如下3个重要特点:
1.Spark SQL 支持多种结构化数据格式的读取,比如JSON,Parquet或者Hive表。
2.Spark SQL 支持从多种外部数据源读取数据,除了本地数据,HDFS以及S3之外,还可以通过JDBC等标准数据库连接器连接外部的关系型数据库系统。
3.最后一点就是能够在Spark程序中*的进行SQL操作,并与各种编程语言Python/Java/Scala实现高度融合。
为了实现这些重要功能,Spark SQL中引入了一种特殊的RDD,叫做DataFrame,一开始其也被称为SchemaRDD(Spark 1.3.0之前)。
下面我们就先来重点看一看DataFrame。
DataFrame理论
DataFrame 是一种以RDD为基础的分布式数据集,理论上非常接近于关系型数据库中的一张数据表(table),或者是Python pandas中的数据抽象Data Frame。
但是相比于Python或者R中的DataFrame, Spark SQL中的DataFrame,在执行时内部做了更多优化。首先,和普通RDD一样,DataFrame同样也是遵守惰性机制,即真正的计算只有当action(比如 展示结果或者保存输出)被触发时才会进行,也正是因为这种机制,才让Spark SQL基于DataFrame的操作可以被自动化地进行优化。
Spark SQL背后的这种核心优化器被称为Catalyst Optimizer。我以下面这张图为基础来具体介绍一下Catalyst的工作流。
首先,被选中的DataFrame/Dataset以及对应的SQL语句会作为输入被Catalyst接收并生成未处理的Query Plan。每个Plan都用一棵树作为对用户程序的一个抽象集合,其节点用于描述从输入数据集到输出查询结果的一系列操作过程,每个节点对应其中的某一步。下面是一个简单的例子。
然后树与树之间再通过一系列的转换操作(Transformations),从开始的Query Plan得到优化后的Optimized Query Plan,用于生成RDD DAG来进行后续的执行,转换操作的内部结构如下。
整个优化过程就汇聚在Transformations部分,那么具体是如何进行转换的呢?在总结完整的转换过程之前,首先我们先对Transformation过程中涉及到的2个概念做个简单介绍。
1.Logical Plan:其定义了用户的查询语句涉及的相关计算操作,但是并不包括具体的计算流程。
2.Physical Plan:其具体描述了对于输入数据集的运算流程,因此它是可执行的。
接下来,我们来看一看Transformation具体是如何转换的。在整个Transformation的过程中,共包含两种不同类型的转换,同类别树之间的相互转换和不同类别树之间的相互转换。
我们首先来看一下第一种转换,即同类别树之间的转换,比如从Logical Plan到Optimized Logical Plan的转换。在Spark中,每一个单一转换都由某个rule定义,而每个rule又对应某个函数,这种函数被称为transform。一般情况下,我们习惯于将多个single rule 组合使用形成一个完整的转换过程。下面是转换过程的某个部分的例子。
上述转换过程涉及一个非常典型的优化步骤,被称为Predicate Pushdown。即将filter predicate 下压至指定的data source,上图中的t2表。先对t2表进行筛选,然后将筛选之后的结果进行join操作,这样避免了不必要的数据的join,有效增加了join的效率。
而这种将多个rule组合使用又被称为Rule Executor.即一个Rule Executor通过使用多个rule将一棵树转换为另一棵同类别的树。
接下来,我们来看一下第二种转换–不同类别树之间的转换,从Logical Plan 到 Physical Plan的转换。其中利用的就是一系列的策略,每一种策略都对应Logical Plan的某个节点到Physical Plan对应节点的转换过程。每个策略的运行都会触发后续策略的启动。
OK,最后我们来完整地梳理一下整个Catalyst的优化过程。
1.Analysis:使用Rule Executor将Unresolved Logical Plan 转换成 Resolved Logical Plan。
2.Logical Optimization:使用另一个Rule Executor将Resolved Logical Plan转换成Optimized Logical Plan。
3.Physical Planning:这个部分分为两个阶段,阶段1:通过Strategies将Optimized Logical Plan转换成Physical Plan。阶段2:通过Rule Executor来调整它,用于最终的执行。
以上就是对Catalyst Optimizer的一个简单介绍,另外一个值得一提的点就是,对于Python使用者,即PySpark来说,Catalyst Optimizer的另一个好处就是可以显著提升PySpark的使用效率。换句话说,相比于在PySpark中使用RDD,使用DataFrame的查询效率会有极大的提升。原因很简单,因为对于Catalyst Optimizer而言,无论开始的代码使用的是何种语言编写的,最终optimizer都会统一生成JVM 二进制编码用于执行,因此使用DataFrame时,执行速度不受编程语言的影响。但是如果是RDD,Python的执行速度相比于同样进行RDD操作的Java或Scala,会慢很多。主要原因就是Python与JVM在沟通过程中的开销。我们可以来看一下下面这张图来直观地感受一下。
DataFrame 实践
说完了Spark SQL背后重要的优化器,现在我们正式进入DataFrame的使用介绍。这里以PySpark作为例子。
首先我们需要先创建一个SparkSession
,在spark2,0之前,我们会使用SQLContext。同时Spark还有其他多种contexts,比如HiveContext, StreamingContext, and SparkContext等等,现在都统一合并成SparkSession
。
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
conf=SparkConf().setMaster('local').setAppName('sparksql')
sc=SparkContext(conf=conf)
spark=SparkSession.builder.appName('sparksql').master('local').getOrCreate()
我们有一个简单描述学生信息的JSON文件std_info.json
,我们使用SparkSession将它读取进来。
[{"id": "123", "name": "Katie", "age": 19, "eyeColor": "brown"},
{"id": "234", "name": "Michael", "age": 22, "eyeColor": "green"},
{"id": "345", "name": "Simone", "age": 23, "eyeColor": "blue"}]
df=spark.read.json('std_info.json') # 生成DataFrame
df.show()
# 返回结果
+---+--------+---+-------+
|age|eyeColor| id| name|
+---+--------+---+-------+
| 19| brown|123| Katie|
| 22| green|234|Michael|
| 23| blue|345| Simone|
+---+--------+---+-------+
除了使用.show()
方法来查看DataFrame中包含的数据,我们还可以使用.collect()
以及.take(n)
,n代表我们想要取的row数,返回的Row对象会被存储在列表中返回。
# 我们还可以查看列信息
df.printSchema()
# 返回结果
root
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
DataFrame API 进行数据查询
接下来我们先使用DataFrame的API来进行数据的query。
1.我们可以先使用
.count()
方法来查看DataFrame的Row个数。
df.count()
# 返回结果
3
2.我们可以组合使用
.select()
和.filter()
来进行条件筛选。
# 查询年龄为22的学生
df.select('id','age').filter('age=22').show()
# or
df.select(df.id,df.age).filter(df.age==22).show()
# 返回结果
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
# 查询眼睛颜色以字母b开头的学生
df.select('name','eyeColor').filter('eyeColor like "b%"').show()
# 返回结果
+------+--------+
| name|eyeColor|
+------+--------+
| Katie| brown|
|Simone| blue|
+------+--------+
使用SQL进行数据查询
我们使用SQL进行和上面相同的查询任务,使用spark.sql()
。需要注意的一点是当我们需要对DataFrame进行SQL查询的时候,我们需要先把DataFrame注册成一个Table或者View。
df.createOrReplaceTempView("df")
1.查询Row个数。
spark.sql(" select count(1) from df" ).show()
# 返回结果
+--------+
|count(1)|
+--------+
| 3|
+--------+
2.查询年龄为22的学生
spark.sql('select id,age from df where age=22').show()
# 返回结果
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
3.查询眼睛颜色以字母b开头的学生
spark.sql('select name,eyeColor from df where eyeColor like "b%"').show()
# 返回结果
+------+--------+
| name|eyeColor|
+------+--------+
| Katie| brown|
|Simone| blue|
+------+--------+
如何将RDD转换为DataFrame
从RDD->DataFrame是一个数据结构化的过程,我们需要自定义schema。来看下面这个例子。
# import types 用于schema的定义
from pyspark.sql.types import *
# 创建一个RDD 和上述例子中包含的信息一致
rdd=sc.parallelize([
(123,'Katie',19,'brown'),
(234,'Michael',22,'green'),
(345,'Simone',23,'blue')
])
# 定义schema
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
#创建DataFrame
rdd2df = spark.createDataFrame(rdd, schema)
rdd2df.show()
# 返回结果
+---+-------+---+--------+
| id| name|age|eyeColor|
+---+-------+---+--------+
|123| Katie| 19| brown|
|234|Michael| 22| green|
|345| Simone| 23| blue|
+---+-------+---+--------+
总结
OK,到这里为止,关于Spark SQL的基本理论背景以及基本的DataFrame操作已经介绍完毕了,希望对大家有帮助吧!
参考
1.Learning PySpark. Tomasz Drabas, Denny Lee
2.Introducing DataFrames in Apache Spark for Large Scale Data Science