文章目录
- SparkSQL 核心编程
SparkSQL 核心编程
新的起点
Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext,Spark SQL其实可以理解为对Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。当我们使用
spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext
DataFrame
Spark SQL的DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation操作也有action操作。
创建DataFrame
从Spark数据源进行创建
- 查看Spark支持创建文件的数据源格式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
- 读取json文件创建DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
注意:如果从内存中获取数据,spark可以知道数据类型具体是什么。
如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换。
- 展示结果
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
从RDD进行转换
从Hive Table进行查询返回
SQL语法
这种风格的查询必须要有临时视图或者全局视图来辅助
- 读取JSON文件创建DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 对DataFrame创建一个临时表
scala> df.createOrReplaceTempView("people")
- 通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("select * from people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 结果展示
scala> sqlDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。
使用全局临时表时需要全路径访问,如global_temp.people
- 对于DataFrame创建一个全局表
scala> df.createGlobalTempView("people")
- 通过SQL语句实现查询全表
scala> spark.sql("select * from global_temp.people").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.newSession.sql("select * from global_temp.people").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
DSL语法
DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化数据。
可以在Scala,Java,Python和R中使用DSL,使用DSL语法风格不必去创建临时视图了。
- 创建一个DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 查看DataFrame的Schema信息
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
- 只查看"name"列数据
scala> df.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
- 查看"username"列数据以及"age+1"数据
注意:涉及到计算的时候,每列都必须使用$,或者采用引号表达式:单引号+字段名
scala> df.select($"name", $"age" + 1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select('name, 'age + 1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.select('name, 'age + 1 as "newAge").show
+-------+------+
| name|newAge|
+-------+------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+------+
- 查看"age"大于"30"的数据
scala> df.filter('age >= 30).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.filter($"age" >= 30).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
- 按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
RDD转换为DataFrame
在IDEA中开发程序时,如果需要RDD与DF或者DS之间相互操作,那么需要引入import spark.implicits._
这里的spark不是Scala中的包名,而是创建的SparkSession对象的变量名称,所以必须先创建SparkSession对象再导入。
这里的spark对象不能使用var声明,因为Scala只支持val修饰的对象的引入
spark-shell中无需导入,自动完成此操作。
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[150] at textFile at <console>:24
scala> peopleRDD.toDF("line").show
+-----------+
| line|
+-----------+
|Michael, 29|
| Andy, 30|
| Justin, 19|
+-----------+
实际开发中,一般通过样例类将RDD转换为DataFrame
scala> case class User(name: String, age: Int)
defined class User
scala> sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDF.show
+-----+---+
| name|age|
+-----+---+
| dsy| 18|
|sarah| 19|
+-----+---+
DataFrame转换为RDD
DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD
scala> val df = sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[177] at rdd at <console>:25
scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([dsy,18], [sarah,19])
注意:此时得到的RDD存储类型为Row
scala> array(0)
res62: org.apache.spark.sql.Row = [dsy,18]
scala> array(0)(0)
res63: Any = dsy
scala> array(0).getAs[String]("name")
res64: String = dsy
DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息
创建DataSet
- 使用样例类序列创建DataSet
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("dsy", 18)).toDS
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
| dsy| 18|
+----+---+
- 使用基本类型的序列创建DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。
RDD转换为DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。
case类可以包含诸如Seq或者Array等复杂的结构
scala> case class User(name: String, age: Int)
defined class User
scala> sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDS
res67: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
DataSet转换为RDD
DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD
scala> case class User(name: String, age: Int)
defined class User
scala> val ds = sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDS
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[184] at rdd at <console>:25
scala> rdd.collect
res68: Array[User] = Array(User(dsy,18), User(sarah,19))
DataFrame和DataSet转换
DataFrame其实是DataSet的特例,所以它们之间是可以相互转换的。
- DataFrame转换为DataSet
scala> case class User(name: String, age: Int)
defined class User
scala> val df = sc.makeRDD(List(("dsy", 18), ("sarah", 19))).toDF("name", "age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
- DataSet转换为DataFrame
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
RDD,DataFrame,DataSet 三者的关系
在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。
- 区别(从版本的产生上来看)
- Spark1.0 => RDD
- Spark1.3 => DataFrame
- Spark1.6 => DataSet
如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。不同的是它们的执行效率和执行方式。
在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口
三者的共性
- RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
- 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
- 三者有许多共同的函数,如filter,排序等;
- 在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
- 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
- 三者都有partition的概念
- DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型
三者的区别
-
RDD
- RDD一般和spark mlib同时使用
- RDD不支持sparksql操作
-
DataFrame
- 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
- DataFrame与DataSet一般不与 spark mlib 同时使用
- DataFrame与DataSet均支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作
- DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
-
DataSet
- Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
- DataFrame其实就是DataSet的一个特例 type DataFrame = Dataset[Row]
- DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很*的获得每一行的信息
三者的互相转换
IDEA开发SparkSQL
实际开发中,都是使用IDEA进行开发的。
- 添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
</dependency>
- 代码实现
/**
* @author Sarah
* @create 2020-06-10 21:00
*/
object SparkSQL_Test {
def main(args: Array[String]): Unit = {
// 创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL Test")
// 创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// RDD => DataFrame => DataSet 转换需要引入隐式转换规则,否则无法转换
// spark不是包名,是上下文环境对象名
import spark.implicits._
// 读取json文件 创建DataFrame {"username" : "dsy", "age" : 18}
val df: DataFrame = spark.read.json("input/test.json")
// df.show()
// SQL风格语法
df.createOrReplaceTempView("user")
// spark.sql("select avg(age) from user").show()
// DSL风格语法
// df.select("username", "age").show()
// ********** RDD => DataFrame => DataSet **********
// RDD
val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "dsy", 17), (2, "sarah", 18), (3, "dongsaiyuan", 19)))
// DataFrame
val df1: DataFrame = rdd1.toDF("id", "name", "age")
// df1.show()
// DataSet
val ds1: Dataset[User] = df1.as[User]
// ds1.show()
// ********** DataSet => DataFrame => RDD **********
// DataFrame
val df2: DataFrame = ds1.toDF()
// df2.show()
// RDD
// 返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始
val rdd2: RDD[Row] = df2.rdd
// rdd2.foreach {
// case row => {
// val id: Int = row.getAs[Int]("id")
// val name: String = row.getAs[String]("name")
// val age: Int = row.getAs[Int]("age")
// println(s"id = $id, name = $name, age = $age")
// }
// }
// ********** RDD => DataSet **********
val ds3: Dataset[User] = rdd1.map {
case (id, name, age) => User(id, name, age)
}.toDS()
ds3.show()
// ********** DataSet => RDD **********
val rdd3: RDD[User] = ds3.rdd
rdd3.collect().foreach(println)
// 释放资源
spark.stop()
}
case class User(id: Int, name: String, age: Int)
}
用户自定义函数
用户可以通过spark.udf.register功能添加自定义函数,实现自定义功能。
UDF
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDF Demo")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// 注册 DataFrame
val df: DataFrame = spark.read.json("input/user1.json")
// 注册 UDF
spark.udf.register("addName", (name: String) => "Name: " + name)
// 创建临时表
df.createOrReplaceTempView("user")
// 应用 UDF
spark.sql("select addName(username) from user").show()
spark.stop()
}
UDAF
强类型的DataSet和弱类型的DataFrame都提供了相关的聚合函数,如count(),countDistinct(),avg(),max(),min()。
除此之外,用户可以设定自己的自定义聚合函数。
弱类型 - DataFrame
- 通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF WeakType Demo")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("input/user1.json")
df.createOrReplaceTempView("user")
// 创建聚合函数
val myAvg = new MyAverageUDAF
// 在spark中注册聚合函数
spark.udf.register("avgAge", myAvg)
spark.sql("select avgAge(age) from user").show()
spark.stop()
}
/**
* 定义类继承UserDefinedAggregateFunction,并重写其中方法
*/
class MyAverageUDAF extends UserDefinedAggregateFunction {
// 聚合函数输入参数的数据类型
override def inputSchema: StructType = {
StructType(Array(StructField("age", IntegerType)))
}
// 聚合函数缓冲区中值的数据类型 (age,count)
override def bufferSchema: StructType = {
StructType(Array(StructField("sum", LongType), StructField("count", LongType)))
}
// 函数返回值的数据类型
override def dataType: DataType = DoubleType
// 稳定性:对于相同的输入是否一直返回相同的输出
override def deterministic: Boolean = true
// 函数缓冲区初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存年龄的总和
buffer(0) = 0L
// 存年龄的个数
buffer(1) = 0L
}
// 更新缓冲区中的数据
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if(!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getInt(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 合并缓冲区
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 计算最终结果
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
强类型 - DataSet
- 通过继承 Aggregator[-IN,BUF,OUT] 来实现用户自定义聚合函数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("input/user1.json")
// 封装为 DataSet
val ds: Dataset[User] = df.as[User]
// 创建聚合函数
val myAvgUDAF = new MyAverageUDAF
// 将聚合函数转换为查询的列
val col: TypedColumn[User, Double] = myAvgUDAF.toColumn
// 查询
ds.select(col).show()
spark.stop()
}
// 输入数据类型
case class User(username: String, age: Long)
// 缓存类型
case class AvgBuf(var sum: Long, var count: Long)
/**
* 定义类继承 org.apache.spark.sql.expressions.Aggregator
* 重写类中的方法
*/
class MyAverageUDAF extends Aggregator[User, AvgBuf,Double] {
// 缓冲区初始化
override def zero: AvgBuf = {
AvgBuf(0L, 0L)
}
// 更新缓冲区中的数据
override def reduce(buffer: AvgBuf, user: User): AvgBuf = {
buffer.sum = buffer.sum + user.age
buffer.count = buffer.count + 1
buffer
}
// 合并缓冲区
override def merge(buffer1: AvgBuf, buffer2: AvgBuf): AvgBuf = {
buffer1.sum += buffer2.sum
buffer1.count += buffer2.count
buffer1
}
// 计算最终结果
override def finish(reduction: AvgBuf): Double = {
reduction.sum.toDouble / reduction.count
}
// DataSet 默认的编解码器,用于序列化,固定写法
// 自定义类型就是product,自带类型根据类型选择
override def bufferEncoder: Encoder[AvgBuf] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
数据的加载和保存
通用的加载和保存方式
SparkSQL 提供了通用的保存数据和加载数据的方式。
这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据。
SparkSQL默认读取和保存的文件格式为Parquet
加载数据
spark.read.load("…") 是加载数据的通用方法
如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")
- format("…"):指定加载的数据类型。包括"csv",“dbc”,“json”,“orc”,“parquet”,“textFile”
- load("…"):指定加载的数据的路径。
- option("…"):在"jdbc"格式下需要传入JDBC相应参数。url,user,password,dbtable。
可以直接在文件上进行查询:文件格式.`文件路径`
scala>spark.sql("select * from json.`/opt/module/data/user.json`").show
保存数据
df.write.save("…") 是保存数据的通用方法
如果保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")
- format("…"):指定加载的数据类型。包括"csv",“dbc”,“json”,“orc”,“parquet”,“textFile”
- load("…"):指定加载的数据的路径。
- option("…"):在"jdbc"格式下需要传入JDBC相应参数。url,user,password,dbtable。
保存操作可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置。
注意:这些SaveMode都是没有加锁的,也不是原子操作
SaveMode是一个枚举类,其中的常量包括:
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists(default) | “error”(default) | 如果文件已经存在则抛出异常 |
SaveMode.Append | “append” | 如果文件已经存在则追加 |
SaveMode.Overwrite | “overwrite” | 如果文件已经存在则覆盖 |
SaveMode.Ignore | “ignore” | 如果文件已经存在则忽略 |
df.write.mode("append").json("/opt/module/data/output")
Parquet
Spark SQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。
数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。
加载数据
scala> val df = spark.read.load("examples/src/main/resources/users.parquet")
scala> df.show
保存数据
scala> var df = spark.read.json("/opt/module/data/input/people.json")
//保存为parquet格式
scala> df.write.mode("append").save("/opt/module/data/output")
JSON
Spark SQL能够自动推测JSON数据集的结构,并将它加载为一个DataSet[Row]。可以通过spark.read.json("…")去加载JSON文件。
注意:Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。
格式如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
CSV
Spark SQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列
spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header",
"true").load("data/user.csv")
MySQL
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。
bin/spark-shell
--jars mysql-connector-java-5.1.27-bin.jar
- 在IDEA中通过JDBC对Mysql进行操作:
导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
读取数据
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// 方式1:通用的load方法读取
spark.read.format("jdbc")
.option("url", "jdbc:mysql://hadoop131:3306/sparksql")
.option("user", "root")
.option("password", "saiyuan000")
.option("dbtable", "user")
.load().show()
// 方法2:通用的load方法读取 参数另一种形式
spark.read.format("jdbc")
.options(Map(
"url" -> "jdbc:mysql://hadoop131/sparksql?user=root&password=saiyuan000",
"dbtable" -> "user",
"driver" -> "com.mysql.jdbc.Driver"))
.load().show()
// 方法3:使用jdbc方法读取
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "saiyuan000")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop131:3306/sparksql", "user", props)
df.show()
// 释放资源
spark.stop()
}
写入数据
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User] = spark.sparkContext.makeRDD(
List(
User("dsy", 18),
User("sarah", 19),
User("dongsaiyuan", 20)
)
)
val ds: Dataset[User] = rdd.toDS()
// 方式1:通用的方法 format指定写出类型
ds.write.format("jdbc")
.option("url", "jdbc:mysql://hadoop131:3306/sparksql")
.option("user", "root")
.option("password", "saiyuan000")
.option("dbtable", "user")
.mode(SaveMode.Append)
.save()
// 方法2:通过jdbc方法
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "saiyuan000")
ds.write.mode(SaveMode.Append)
.jdbc("jdbc:mysql://hadoop131:3306/sparksql", "user", props)
// 释放资源
spark.stop()
}
case class User(name: String, age: Int)
Hive
Apache Hive是Hadoop上的SQL引擎,SparkSQL编译时可以包含Hive支持,也可以不包含。
包含Hive支持的Spark SQL可以支持Hive表访问,UDF(用户自定义函数),以及Hive查询语言(HiveQL/HQL)等。
注意:如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。因为Spark有内置的Hive。
若要把Spark SQL连接到一个部署好的Hive上,必须把hive-site.xml复制到Spark的配置文件目录中($SPARK_HOME/conf)。默认的文件系统是HDFS。
如果没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive元数据仓库,叫做metastore_db。本地文件系统。
如果使用HiveQL中的CREATE TABLE(并非 CRREATE EXTERNAL TABLE)语句来创建表,这些表会被放在默认的文件系统中的/user/hive/warehouse目录中。
注意:虽然Spark-shell默认是支持Hive的,但是由于当前使用Spark2.4.5版本,和Hive3.1.2版本不兼容,所以无法在对应版本的spark-shell中使用hive。未来版本会解决找个问题。
内嵌的HIVE
如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
Hive 的元数据存储在 derby 中, 仓库地址:$SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> spark.sql("create table aa(id int)")
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| aa| false|
+--------+---------+-----------+
- 向表加载本地数据
scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")
scala> spark.sql("select * from aa").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
在实际使用中, 几乎没有任何人会使用内置的 Hive
外部的HIVE
如果想连接外部已经部署好的Hive,需要通过以下几个步骤:
- Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下
- 把Mysql的驱动copy到jars/目录下
- 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下
scala> spark.sql("show tables").show
20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning
NoSuchObjectException
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| emp| false|
| default|hive_hbase_emp_table| false|
| default| relevance_hbase_emp| false|
| default| staff_hive| false|
| default| ttt| false|
| default| user_visit_action| false|
+--------+--------------------+-----------+
运行Spark SQL CLI
Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。
在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口
bin/spark-sql
代码操作Hive
- 导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
- 将hive-site.xml文件拷贝到项目的resources目录中,代码实现
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "dsy")
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.config(conf)
.config("spark.sql.warehouse.dir", "hdfs://hadoop131:9820/user/hive/warehouse")
.getOrCreate()
import spark.implicits._
spark.sql("use default")
spark.sql("create table user(name varchar(20), age int)")
spark.sql("show tables").show()
spark.sql("insert into table user values('dsy', 18)")
spark.sql("insert into table user values('sarah', 19)")
spark.sql("insert into table user values('dongsaiyuan', 20)")
spark.stop()
}
注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址:
config("spark.sql.warehouse.dir", "hdfs://hadoop131:9820/user/hive/warehouse")
可以代码最前面增加如下代码解决:
System.setProperty("HADOOP_USER_NAME", "dsy")
注意:如果在执行时出现连接异常,检查下hive的两个服务有没有开。