sparkSQL 的由来
我们知道最初的计算框架叫 mapreduce,他的缺点是计算速度慢,还有一个就是代码比较麻烦,所以有了 hive;
hive 是把类 sql 的语句转换成 mapreduce,解决了开发难的问题,但是 hive 的底层还是 mapreduce,仍然是慢;
spark 也看到了 hive 的优势,以 hive 为中心的一套框架 shark 营运而生,它是 spark 的前身,h 就是 hive 的意思;
但是 为了 提高 shark 的效率,spark 自己开发了一套算法,替代了之前 hive 的思路,这套算法就是 sparkSQL
sparkSQL 简介
sparkSQL 是 spark 专门处理结构化数据的一个模块,也就是像数据表一样的数据,处理方式就是像 sql 一样;
换句话说,sparkSQL 使用 sql 的方式代替了之前数据处理的方式。
sparkSQL 提供了两个编程抽象:DataFrame 和 DataSet,起到了分布式 SQL 查询引擎的作用;
sparkSQL 把 sql 语句 和 dataFrame、dataSet 转换成了 RDD,执行效率非常快;
也就是说 dataFrame、dataSet 的底层仍然 是 RDD,并且可以互相转换
sparkSQL 的特点
官方解释
易整合:
兼容 hive:
统一的数据访问方式:用同样的方式读取各类文件
标准的数据库连接:可以通过 JDBC 或者 ODBC 连接标准数据库
后面会详细解释。
DataFrame (df)
与 RDD 类似,df 也是分布式的数据容器,不同的是,df 更像一个 二维数据表,除了数据本身外,还包含了数据的结构信息,即 schema;
df 的 API 提供了更高层的关系操作,比函数式的 RDD API 更加友好;
df 的底层仍是 RDD,所以 df 也是惰性执行的,但值得注意的是,它比 RDD 性能更高;
问题来了:为什么底层实现是 RDD,却比 RDD 更快,不合常理啊
其实是这样的,因为 df 是由 spark 自己转换成 RDD 的,那么 spark 自然会用最合适的、最优化的方式转换成 RDD,因为它比任何人都清楚怎么才能更高效,
对比我们自己操作 RDD 去实现各种功能,大部分情况下我们的作法可能不是最优,自己玩不如作者玩,所以说 df 性能高于 RDD
举个简单例子:
data1 = sc.parallelize([('1','a'), ('2', 'b'), ('3', 'c')]) data2 = sc.parallelize([('1','1'), ('2', '2'), ('3', '3')]) ### 找到两个list中 key 为 1 的对应值的集合 ## 自己写可能这么写 data1.join(data2).collect() # [('1', ('a', '1')), ('3', ('c', '3')), ('2', ('b', '2'))] data1.join(data2).filter(lambda x: x[0] == '1').collect() # [('1', ('a', '1'))] ## spark 可能这么写 data1.filter(lambda x: x[0] == '1').join(data2.filter(lambda x: x[0] == '1')).collect() # [('1', ('a', '1'))]
为什么 spark 这么写快呢?这里简单解释下
join 是把 两个元素做 笛卡尔內积,生成了 3x3=9 个元素,然后 shuffle,每个分区分别比较 key 是否相同,如果相同,合并,然后合并分区结果;
我们自己写的就是这样,shuffle 了 9 个元素;
而 spark 是先 filter,每个 list 变成了 一个元素,然后 join,join 的结果直接就是所需,不用 shuffle;
shuffle 本身是耗时的,而 filter 无需 shuffle,所以效率高 【join 是个 低效方法的原因】
小结
1. df 也是一个查询优化的手段
2. df 允许我们像操作数据库一样操作它
DataSet
DataSet 是 DataFrame 的扩展,是 spark 最新的数据抽象;
dataSet 像个对象,允许我们像操作类一样操作它,通过属性查看数据;
实际上 DataSet 是在 df 的基础上增加了数据类型(这样表述或许不太准确,可以理解为 类或者对象)的概念
python 目前不支持 dataSet,所以后续支持了再说
rdd-df-dataset
发展历程
RDD(spark1.0) ===> DataFrame(spark1.3) ===> DataSet(spark1.6)
转换逻辑
rdd + 表结构 = df
rdd + 表结构 + 数据类型 = ds
df + 数据类型 = ds
ds - 数据类型 - 表结构 = rdd
ds - 数据类型 = df
df - 表结构 = rdd
转换方法
具体如何转换,我后续会专门写一篇博客
SparkSession
在老版本中,sparkSQL 提供了两种 SQL 查询的起始点:
SQLContext,用于 spark 自己提供的 SQL 查询;
HiveContext,用于连接 hive 的查询
sparkSession 是新版的 SQL 查询起始点,实质上是组合了 SQLContext 和 HiveContext;
sparkSession 只是封装了 sparkContext,sparkContext 包含 SQLContext 和 HiveContext;
所以 sparkSession 实际上还是 依靠 sparkContext 实现了 SQLContext 和 HiveContext,故老版本用法也适用新版本。
sparkSession 直接生成 df
DataFrame 的创建
sparkSession 自动生成 df
df 的创建有 3 种方式
从 spark 的数据源创建:读取 spark 支持的文件
从内部 RDD 创建:RDD 转换成 df
从 hive 创建:hive 查询
spark 数据源创建df
spark 支持的文件格式都有统一的入口
>>> dir(spark.read) [ 'csv', 'format', 'jdbc', 'json', 'load', 'option', 'options', 'orc', 'parquet', 'schema', 'table', 'text']
json
文件读取方式都一样,所以仅以 json 为例
## json 文件如下 # {'age': '10','name': 'zhangsan'} # {'age': '20','name': 'lisi'} df1 = spark.read.json('data.json') # 相对路径 # >>> df1 # DataFrame[age: string, name: string] 可以看到 df 具备了字段名和字段属性 df1.show() # +---+--------+ # |age| name| # +---+--------+ # | 10|zhangsan| # | 20| lisi| # +---+--------+ df2 = spark.read.json('file:///usr/lib/spark/data.json') # 绝对路径
RDD 创建 df
待续...
hive 创建 df
待续...
sparkSQL 操作 df
sparkSQL 有两种风格,这里只介绍简单用法,详细用法后续会一一介绍。
SQL 风格操作 df
df 生成后,怎么写 sql 呢?没表啊
所以还需创建表,方法可能不止一种,比如
### 创建临时视图 df1.createTempView('student') # df1.createOrReplaceTempView('student') # ok spark.sql('select * from student').show() # +---+--------+ # |age| name| # +---+--------+ # | 10|zhangsan| # | 20| lisi| # +---+--------+ spark.sql('select age from student').show() spark.sql('select avg(age) from student').show() # +------------------------+ # |avg(CAST(age AS DOUBLE))| # +------------------------+ # | 15.0| # +------------------------+
到这里有个问题,不知道大家想到没?
这张表在哪呢?能不能重复操作它呢?退出 session 还能操作吗?或者换个 session 还能操作吗?
session
这里穿插讲下 session 的概念;
session 的本意是会话,我们在多个场合都见过 session,如 web,如 tensorflow,但是在 web 中貌似不是 会话啊;
其实是这样的,session 有广义和狭义之分
广义 session:就是我们说的会话
狭义 session:它是一个存储位置,和 cookie 相对,cookie 是把某个信息存在客户度,session 是把 某个信息存在服务器上
全局表
临时表是在 session 范围内的,session 关闭后,临时表失效,如果想应用范围内有效,可以使用全局表,
全局表需要全路径访问
### 为了在应用范围内使用数据表,创建全局表 df1.createGlobalTempView('people') ## 查询 spark.sql('select * from global_temp.people').show() # global_temp.people 全路径访问表 ## 在另一个 session 中查询该表 spark.newSession().sql('select * from global_temp.people').show()
DSL 风格操作 df
不常用
df1.printSchema() # 打印表结构 # root # |-- age: string (nullable = true) # |-- name: string (nullable = true) df1.select('name').show() # 查询name字段 df1.select("name", df1.age + 1).show() # age 字段的值都 加1,scala 中是用 $'age' 代替 df.age # +--------+---------+ # | name|(age + 1)| # +--------+---------+ # |zhangsan| 11.0| # | lisi| 21.0| # +--------+---------+ df1.filter(df1.age > 15).show() # 查看 age 大于 15 # +---+----+ # |age|name| # +---+----+ # | 20|lisi| # +---+----+
总结
sparkSQL 已逐渐成为主流,代替了 RDD 操作,所以必须掌握哦