DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。通过在分布式数据集上施加结构,让Spark用户利用Spark SQL来车讯结构化的数据或使用Spark表达式方法(而不是lambda)。
1.python到RDD之间的通信
每当使用RDD执行PySpark程序时,潜在地需要巨大地开销来执行作业。如图
在PySpark驱动器中,Spark Context通过Py4j启动一个使用JavaSparkContext地JVM。所有地RDD转换最初都映射到Java中地PythonRDD对象。
一旦这些任务被推送到Spark工作节点,PythonRDD对象就使用管道(pipe)启动Python地子进程(subprocess),发送代码和数据到Python中进行处理。虽然该方法允许PySpark将数据处理分布到多个工作节点地多个Python子进程中,但是Python和JVM之间还是有很多上下文切换和通信开销。
2 Catalyst优化器刷新
Spark SQL引擎快地主要原因之一就是因为Catalyst优化器。Catalyst优化器地意义在于相对于立即处理查询来说,Spark引擎地Catalyst优化器编译并优化了逻辑计划,而且还有一个能够确保生成最有效地物理计划地成本优化器,如图:
优化器是基于功能地编程结构,并且其设计有两个目的:为了便于对Spark SQL添加新的优化技术功能,以及允许外部开发人员扩展优化器(入添加数据源特定规则、支持新的数据类型等等)。
3 利用DataFrame加速Pyspark
DataFrame和Catalyst优化器(以及Tungsten项目)地意义是在和非优化的RDD查询比较时增加PySpark查询地性能.引入DataFrame之前,Python查询速度普遍比使用RDD地Scala查询慢。通常情况下,这种查询性能地降低源于Python和JVM之间地通信开销。
Python DataFrame和SQL、Scala DataFrame以及R DataFrame都能够利用Catalyst优化器:
4 创建DataFrame
通常情况下,通过使用SparkSession导入数据(或者调用PySpark地shell脚本spark)来创建DataFrame。
4.1 生成JSON数据
stringJSONRDD = sc.parallelize(("""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)
stringJSONRDD.collect()
[out]:[' \n { "id": "123",\n "name": "Katie",\n "age": 19,\n "eyeColor": "brown"\n }',
'{\n "id": "234",\n "name": "Michael",\n "age": 22,\n "eyeColor": "green"\n }',
'{\n "id": "345",\n "name": "Simone",\n "age": 23,\n "eyeColor": "blue"\n }']
利用SparkSession read.json方法,RDD将会被转换成一个DataFrame(即spark.read.json())。
4.2 创建一个DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)
swimmersJSON.show()
+---+--------+---+-------+
|age|eyeColor| id| name|
+---+--------+---+-------+
| 19| brown|123| Katie|
| 22| green|234|Michael|
| 23| blue|345| Simone|
+---+--------+---+-------+
DataFrame的转换和动作与RDD的转换和动作类似,还有一套缓慢的(转换)操作。但是,对比RDD,DataFrame操作并不是缓慢的,主要是源于Catalyst优化器。