Spark SQL是Spark中用于结构化数据处理的组件。
Spark SQL可以从Hive中读取数据。
执行结果是Dataset/DataFrame。
DataFrame是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
DataSet是Spark 1.6之后加入的,同时提供了RDD和Spark SQL执行引擎的优点。可以从jvm对象创建,然后通过transformation算子(map
, flatMap
, filter
, etc)转换得到。
DataFrame被DataSet中的RowS替代。
Scala中用DataSet[Row],Java中用DataSet<Row>。
SparkSession
Spark中所有功能的入口点是SparkSession类(Spark 1.x叫SQLContext http://spark.apache.org/docs/2.0.0/api/java/index.html#org.apache.spark.sql.SparkSession)
Spark 2.0内置支持Hive,如使用HiveQL查询,访问Hive UDFs,从Hive获取数据。不需要安装Hive。
创建DataFrames
使用SparkSession,可以从已有的RDD,Hive表,或Spark数据源创建DataFrames。
Dataset 操作(也叫做 DataFrame 操作)
API文档: http://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.sql.Dataset
运行 SQL 查询
查询结果是DataFrame类型。
创建 Datasets
与RDD互操作
两种方式
1.反射
这种基于反射的方法可以得到更简洁的代码,并且在编写Spark应用程序时,当已经知道模式时,它可以很好地工作。
2.通过编程接口创建
数据源
load、save
1. 默认数据源
parquet
2.手动指定
3.在文件上直接运行SQL
保存模式
保存操作的时候可以指定一个SaveMode
存储到持久化表中
可以使用saveAsTable将DataFrame存储到Hive metastore中。saveAsTable会实例化在Hive metastore中的DataFrame内容,并创建一个指针指向它。持久化表会一直存在,即使重启了Spark,只要保持同一个metastore的连接。
Parquet 文件
Spark SQL支持对Parquet文件的读写。
分区发现
表分区是Hive等系统中常用的优化方法。
从spark 1.6.0开始,默认情况下,分区发现仅查找给定路径下的分区
Schema 合并
和 ProtocolBuffer, Avro, and Thrift, Parquet 也支持schema变化。可以增加列。
但是代价高。
1.5.0之后默认被关闭了。
Hive metastore 和Parquet table的转换
当从Hive metastore中读写Parquet table时,Spark SQL为了更好的性能,会尝试使用它自己的支持而不是Hive SerDe。这个行为由spark.sql.hive.convertMetastoreParquet配置,默认开启。
Metadata 刷新
Spark SQL缓存了Parquet metadata
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
JSON Datasets
请注意,作为JSON文件提供的文件不是典型的JSON文件。每一行必须包含一个独立的、自包含的有效JSON对象。因此,常规的多行JSON文件通常会失败。
Hive Tables
Spark SQL支持读写存储在Hive中的数据
注意hive-site.xml
中的hive.metastore.warehouse.dir
从Spark 2.0.0开始已经过时了,用spark.sql.warehouse.dir
.
与不同版本的Hive metastore交互
使用JDBC和其它数据库交互
最好使用JdbcRDD
性能调优
可以通过调用spark.cacheTable("tableName") 或
dataFrame.cache()
.使Spark SQL以列格式缓存表。然后spark sql将只扫描所需的列,并自动调整压缩以最小化内存使用和GC压力。
调用spark.uncacheTable("tableName")移除缓存中表。
通过SparkSession或在SQL中以SET Key = Value形式来设置。
分布式SQL引擎
Spark SQL还可以使用其JDBC/ODBC或命令行接口作为分布式查询引擎。在这种模式下,最终用户或应用程序可以直接与Spark SQL交互以运行SQL查询,而无需编写任何代码。
运行Thrift JDBC/ODBC服务器
./sbin/start-thriftserver.sh
This script accepts all bin/spark-submit
command line options, plus a --hiveconf
option to specify Hive properties. You may run ./sbin/start-thriftserver.sh --help
for a complete list of all available options. By default, the server listens on localhost:10000. You may override this behaviour via either environment variables, i.e.:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...
or system properties:
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...
Now you can use beeline to test the Thrift JDBC/ODBC server:
./bin/beeline
Connect to the JDBC/ODBC server in beeline with:
beeline> !connect jdbc:hive2://localhost:10000