SparkSQL是Spark的一个子模块,主要用于操作结构化数据,借鉴了Hive。
此前使用的是SparkCore模块的RDD结构进行数据处理,SparkSQL提供了结构化的数据结构DataFrame、DataSet。
SparkSQL支持SQL、DSL(domain-specific language)两种方式、多种语言(Scala、Java、Python、R)进行开发,最后底层都转换为RDD.
SparkSQL支持多种数据源(Hive,Avro,Parquet,ORC,JSON 和 JDBC 等)、支持 HiveQL 语法以及 Hive SerDes 和 UDF,允许你访问现有的 Hive 仓库.
零、SparkSession、IDEA整合Spark
SparkSession:这是一个新入口,取代了原本的SQLContext与HiveContext。
SparkContext也可以通过SparkSession获得。
1、SparkSession
交互式环境下启动spark
后,自带一个变量spark
Spark context available as ‘sc‘ (master = local[*], app id = local-1608185209816).
Spark session available as ‘spark‘.
#读取文件
scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]
scala> text.show()
+--------------------+
| value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
| scala spark hive|
+--------------------+
在文件程序中,使用builder
方法获取,本身没有公开的相关构造器可以被使用。
val spark: SparkSession = SparkSession.builder().master("local[3]").getOrCreate()
通过SparkSession入口可以进行数据加载、保存、执行SQL、以及获得其他入口(sqlContext、SparkContext)
2、使用IDEA开发Spark
Spark使用本地开发,将其集成到IDEA中,首先需要有scala
的sdk
,版本要和操作的spark
适配。
其次IDEA需要已经安装过scala
开发插件。
接下来在maven
项目配置文件中导入以下依赖地址即可。
<!-- SparkCore,基础模块 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
<!-- SparkSQL,SQL模块 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
<!-- Spark和hive集成的模块,可用于处理hive中的数据 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0-preview2</version>
<scope>provided</scope>
</dependency>
<!--mysql驱动,与Mysql交互需要-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
如果日志太多碍眼,可以调高日志等级,第一行修改为ERROR,在resources
目录下使用log4j.properties
文件覆盖默认日志配置文件。
#hadoop.root.logger=warn,console
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger‘s log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# Parquet related logging
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR
一、RDD、DataFrame、DataSet
RDD作为SparkCore的数据结构,可以处理结构化非结构化的数据,但效率较低。
SparkSQL中封装了对结构化数据处理效果更好的DataFrame、DateSet.
RDD以行为单位,对行有数据类型。
DataFrame来自pandas库的数据结构,相比于RDD提供了字段类型,DataFrame 内部的有明确 Scheme 结构,即列名、列字段类型都是已知的,这带来的好处是可以减少数据读取以及更好地优化执行计划,从而保证查询效率。
Dataset 也是分布式的数据集合,在 Spark 1.6 版本被引入,它集成了 RDD 和 DataFrame
的优点,具备强类型的特点,同时支持 Lambda 函数,但只能在 Scala 和 Java 语言中使用。三者可以互相转换。
对文本文件返回了 Dataset[String]
scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]
scala> text.show()
+--------------------+
| value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
| scala spark hive|
+--------------------+
#结构
scala> text.printSchema
root
|-- value: string (nullable = true)
#返回一个new session
scala> val nsp = spark.newSession
nsp: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c01b
#读取json默认返回DF
scala> val user = nsp.read.json("/usr/local/bigdata/file/user.json")
user: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]
#定义一个样例类,作为数据类型
scala> case class User(id :Long,username:String,email:String){}
defined class User
#加入数据类型,返回ds
scala> val userds = nsp.read.json("/usr/local/bigdata/file/user.json").as[User]
userds: org.apache.spark.sql.Dataset[User] = [email: string, id: bigint ... 1 more field]
内部结构
scala> userds.printSchema
root
|-- email: string (nullable = true)
|-- id: long (nullable = true)
|-- username: string (nullable = true)
互相转换:
scala> val udf = user.rdd
udf: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at rdd at <console>:25
scala> userds.toDF
res3: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]
二、SQL、DSL
SparkSQL可以使用SQL编程,也可以基于特定语言,调用api进行编程。
1、SQL
使用SQL编程有两步:
? 1、创建临时视图。(有表或视图才能执行sql,spark可以创建临时视图)
? 2、编写sql语句。
#创建视图users
scala> userds.createOrReplaceTempView("users")
#编写语句
scala> nsp.sql("select id,username from users order by id desc").show
+---+--------+
| id|username|
+---+--------+
| 3| Jim|
| 2| jack|
| 1| Bret|
+---+--------+
/*
createGlobalTempView createOrReplaceTempView crossJoin
createOrReplaceGlobalTempView createTempView
*/
如果在文件程序中编写,需要导入以下隐式转换。
import spark.implicits._ #spark是变量名
import org.apache.spark.sql.functions._
2、DSL
DSL是特定语言,如scala、java等。
调用api进行处理。
#scala DSL编程
scala> userds.select(‘id,‘username).orderBy(desc("id")).show()
+---+--------+
| id|username|
+---+--------+
| 3| Jim|
| 2| jack|
| 1| Bret|
+---+--------+
使用Java进行编程。
final SparkSession spark = SparkSession.builder().master("local[3]").getOrCreate();
Dataset<Row> dataset = spark.read().json("spark/data/user3.json");
try{
dataset.createTempView("users");
//SQL
spark.sql("select id,username from users order by id desc").show();
}catch (Exception e){
e.printStackTrace();
}
//DSL
dataset.filter(dataset.col("id").geq(2)).orderBy(dataset.col("id").desc()).show();
spark.close();
三、UDF
UDF:Spark和Hive一样,允许用户自定义函数来补充功能。
SQL方式和DSL方式都支持。
1、SQL方式
#注册函数
scala> nsp.udf.register("plus0",(id:String)=>0+id)
res11: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3815/2132917061@2c634771,StringType,List(Some(Schema(StringType,true))),Some(plus0),true,true)
scala> nsp.sql("select plus0(id),username from users order by id desc").show
+---------+--------+
|plus0(id)|username|
+---------+--------+
| 03| Jim|
| 02| jack|
| 01| Bret|
+---------+--------+
#注册函数
scala> nsp.udf.register("plus1",(id:Int)=>1+id)
res13: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3857/1612958077@7799dc9a,IntegerType,List(Some(Schema(IntegerType,false))),Some(plus1),false,true)
scala> nsp.sql("select plus1(id),username from users order by id desc").show
+---------+--------+
|plus1(id)|username|
+---------+--------+
| 4| Jim|
| 3| jack|
| 2| Bret|
+---------+--------+
2、DSL方式
#定义函数
scala> val prefix_name =udf(
| (name:String)=>{ "user: "+name})
prefix_name: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3567/441398584@40f06edc,StringType,List(Some(Schema(StringType,true))),None,true,true)
#套入字段名
scala> userds.select($"id",prefix_name(‘username).as("newname")).orderBy(desc("id")).show()
+---+----------+
| id| newname|
+---+----------+
| 3| user: Jim|
| 2|user: jack|
| 1|user: Bret|
+---+----------+
Java编写
#接收参数名和一个函数接口
spark.udf().register("prefix_name", new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
return "user: "+s;
}
}, DataTypes.StringType);
try{
dataset.createTempView("users");
#和SQL一样使用
spark.sql("select id,prefix_name(username) as new_name from users order by id desc").show();
}catch (Exception e){
e.printStackTrace();
}
+---+----------+
| id| new_name|
+---+----------+
| 3| user: Jim|
| 2|user: jack|
| 1|user: Bret|
+---+----------+
四、Spark on Hive
SparkSQL集成Hive,加载读取Hive表数据进行分析,称之为Spark on Hive;
Hive 框架底层分区引擎,可以将MapReduce改为Spark,称之为Hive on Spark;
Spark on Hive 相当于Spark使用Hive的数据。
Hive on Spark相当于Hive把自身的计算从MR换成SparkRDD.
可以通过以下方式进行Spark on Hive.
1、Sparksql
spark-sql是spark提供的一种交互模式,使用sql语句进行处理,默认就是使用hive中的数据。
需要将mysql驱动放到jar目录下,或者指定驱动。
spark-sql --master local[*]
#指定驱动
spark-sql --master local[*] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
2、在IDEA中使用spark on hive
添加spark操作hive的依赖。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
core-site.xml,hdfs-site.xml,hive-site.xml
都拷贝到Rsource
目录下.
可以使用Spark 操作Hive数据了。
val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]")
val spark = SparkSession.builder().config(conf)
.enableHiveSupport()//!!!!默认不支持外部hive,这里需调用方法支持外部hive
.getOrCreate()
import spark.implicits._
spark.sql("use spark_sql_hive")
spark.sql(
"""
|select t.id,t.name,t.email from
|(select cast(‘id‘ as INT) ,id ,name,email from user where id >1001) t
""".stripMargin).show()