SparkSQL 核心编程

文章目录

SparkSQL 核心编程

新的起点

Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext,Spark SQL其实可以理解为对Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。当我们使用
spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext

DataFrame

Spark SQL的DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation操作也有action操作。

创建DataFrame

从Spark数据源进行创建

  • 查看Spark支持创建文件的数据源格式
scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
  • 读取json文件创建DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

注意:如果从内存中获取数据,spark可以知道数据类型具体是什么。
如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换。

  • 展示结果
scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

从RDD进行转换

从Hive Table进行查询返回

SQL语法

这种风格的查询必须要有临时视图或者全局视图来辅助

  • 读取JSON文件创建DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  • 对DataFrame创建一个临时表
scala> df.createOrReplaceTempView("people")
  • 通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("select * from people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  • 结果展示
scala> sqlDF.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。
使用全局临时表时需要全路径访问,如global_temp.people

  • 对于DataFrame创建一个全局表
scala> df.createGlobalTempView("people")
  • 通过SQL语句实现查询全表
scala> spark.sql("select * from global_temp.people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> spark.newSession.sql("select * from global_temp.people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

DSL语法

DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化数据。
可以在Scala,Java,Python和R中使用DSL,使用DSL语法风格不必去创建临时视图了。

  • 创建一个DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  • 查看DataFrame的Schema信息
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
  • 只查看"name"列数据
scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
  • 查看"username"列数据以及"age+1"数据

注意:涉及到计算的时候,每列都必须使用$,或者采用引号表达式:单引号+字段名

scala> df.select($"name", $"age" + 1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select('name, 'age + 1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.select('name, 'age + 1 as "newAge").show
+-------+------+
|   name|newAge|
+-------+------+
|Michael|  null|
|   Andy|    31|
| Justin|    20|
+-------+------+
  • 查看"age"大于"30"的数据
scala> df.filter('age >= 30).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.filter($"age" >= 30).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
  • 按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

RDD转换为DataFrame

在IDEA中开发程序时,如果需要RDD与DF或者DS之间相互操作,那么需要引入import spark.implicits._
这里的spark不是Scala中的包名,而是创建的SparkSession对象的变量名称,所以必须先创建SparkSession对象再导入。
这里的spark对象不能使用var声明,因为Scala只支持val修饰的对象的引入
spark-shell中无需导入,自动完成此操作。

scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[150] at textFile at <console>:24

scala> peopleRDD.toDF("line").show
+-----------+
|       line|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

实际开发中,一般通过样例类将RDD转换为DataFrame

scala> case class User(name: String, age: Int)
defined class User

scala> sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDF.show
+-----+---+
| name|age|
+-----+---+
|  dsy| 18|
|sarah| 19|
+-----+---+

DataFrame转换为RDD

DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD

scala> val df = sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[177] at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([dsy,18], [sarah,19])

注意:此时得到的RDD存储类型为Row

scala> array(0)
res62: org.apache.spark.sql.Row = [dsy,18]

scala> array(0)(0)
res63: Any = dsy

scala> array(0).getAs[String]("name")
res64: String = dsy

DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息

创建DataSet

  • 使用样例类序列创建DataSet
scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("dsy", 18)).toDS
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
| dsy| 18|
+----+---+
  • 使用基本类型的序列创建DataSet
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。

RDD转换为DataSet

SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。
case类可以包含诸如Seq或者Array等复杂的结构

scala> case class User(name: String, age: Int)
defined class User

scala> sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDS
res67: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

DataSet转换为RDD

DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD

scala> case class User(name: String, age: Int)
defined class User

scala> val ds = sc.makeRDD(List(("dsy", 18), ("sarah", 19))).map(t => User(t._1, t._2)).toDS
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[184] at rdd at <console>:25

scala> rdd.collect
res68: Array[User] = Array(User(dsy,18), User(sarah,19))

DataFrame和DataSet转换

DataFrame其实是DataSet的特例,所以它们之间是可以相互转换的。

  • DataFrame转换为DataSet
scala> case class User(name: String, age: Int)
defined class User

scala> val df = sc.makeRDD(List(("dsy", 18), ("sarah", 19))).toDF("name", "age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  • DataSet转换为DataFrame
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

RDD,DataFrame,DataSet 三者的关系

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。

  • 区别(从版本的产生上来看)
    • Spark1.0 => RDD
    • Spark1.3 => DataFrame
    • Spark1.6 => DataSet

如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。不同的是它们的执行效率和执行方式。
在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口

三者的共性

  • RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
  • 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
  • 三者有许多共同的函数,如filter,排序等;
  • 在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
  • 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  • 三者都有partition的概念
  • DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型

三者的区别

  • RDD

    • RDD一般和spark mlib同时使用
    • RDD不支持sparksql操作
  • DataFrame

    • 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
    • DataFrame与DataSet一般不与 spark mlib 同时使用
    • DataFrame与DataSet均支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作
    • DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
  • DataSet

    • Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
    • DataFrame其实就是DataSet的一个特例 type DataFrame = Dataset[Row]
    • DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很*的获得每一行的信息

三者的互相转换

IDEA开发SparkSQL

实际开发中,都是使用IDEA进行开发的。

  • 添加依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>2.4.5</version>
</dependency>
  • 代码实现
/**
 * @author Sarah
 * @create 2020-06-10 21:00
 */
object SparkSQL_Test {

  def main(args: Array[String]): Unit = {
    // 创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL Test")

    // 创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    // RDD => DataFrame => DataSet 转换需要引入隐式转换规则,否则无法转换
    // spark不是包名,是上下文环境对象名
    import spark.implicits._

    // 读取json文件 创建DataFrame {"username" : "dsy", "age" : 18}
    val df: DataFrame = spark.read.json("input/test.json")
//    df.show()

    // SQL风格语法
    df.createOrReplaceTempView("user")
//    spark.sql("select avg(age) from user").show()

    // DSL风格语法
//    df.select("username", "age").show()


    // ********** RDD => DataFrame => DataSet **********

    // RDD
    val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "dsy", 17), (2, "sarah", 18), (3, "dongsaiyuan", 19)))

    // DataFrame
    val df1: DataFrame = rdd1.toDF("id", "name", "age")
//    df1.show()

    // DataSet
    val ds1: Dataset[User] = df1.as[User]
//    ds1.show()


    // ********** DataSet => DataFrame => RDD **********

    // DataFrame
    val df2: DataFrame = ds1.toDF()
//    df2.show()

    // RDD
    // 返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始
    val rdd2: RDD[Row] = df2.rdd
//    rdd2.foreach {
//      case row => {
//        val id: Int = row.getAs[Int]("id")
//        val name: String = row.getAs[String]("name")
//        val age: Int = row.getAs[Int]("age")
//        println(s"id = $id, name = $name, age = $age")
//      }
//    }


    // ********** RDD => DataSet **********

    val ds3: Dataset[User] = rdd1.map {
      case (id, name, age) => User(id, name, age)
    }.toDS()
    ds3.show()


    // ********** DataSet => RDD **********

    val rdd3: RDD[User] = ds3.rdd
    rdd3.collect().foreach(println)


    // 释放资源
    spark.stop()
  }

  case class User(id: Int, name: String, age: Int)

}

用户自定义函数

用户可以通过spark.udf.register功能添加自定义函数,实现自定义功能。

UDF

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDF Demo")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    // 注册 DataFrame
    val df: DataFrame = spark.read.json("input/user1.json")

    // 注册 UDF
    spark.udf.register("addName", (name: String) => "Name: " + name)

    // 创建临时表
    df.createOrReplaceTempView("user")

    // 应用 UDF
    spark.sql("select addName(username) from user").show()
    
    spark.stop()
  }

UDAF

强类型的DataSet和弱类型的DataFrame都提供了相关的聚合函数,如count(),countDistinct(),avg(),max(),min()。
除此之外,用户可以设定自己的自定义聚合函数。

弱类型 - DataFrame

  • 通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF WeakType Demo")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read.json("input/user1.json")
    df.createOrReplaceTempView("user")

    // 创建聚合函数
    val myAvg = new MyAverageUDAF

    // 在spark中注册聚合函数
    spark.udf.register("avgAge", myAvg)

    spark.sql("select avgAge(age) from user").show()

    spark.stop()
  }

  /**
   * 定义类继承UserDefinedAggregateFunction,并重写其中方法
   */
  class MyAverageUDAF extends UserDefinedAggregateFunction {

    // 聚合函数输入参数的数据类型
    override def inputSchema: StructType = {
      StructType(Array(StructField("age", IntegerType)))
    }

    // 聚合函数缓冲区中值的数据类型 (age,count)
    override def bufferSchema: StructType = {
      StructType(Array(StructField("sum", LongType), StructField("count", LongType)))
    }

    // 函数返回值的数据类型
    override def dataType: DataType = DoubleType

    // 稳定性:对于相同的输入是否一直返回相同的输出
    override def deterministic: Boolean = true

    // 函数缓冲区初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      // 存年龄的总和
      buffer(0) = 0L

      // 存年龄的个数
      buffer(1) = 0L
    }

    // 更新缓冲区中的数据
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      if(!input.isNullAt(0)) {
        buffer(0) = buffer.getLong(0) + input.getInt(0)
        buffer(1) = buffer.getLong(1) + 1
      }
    }

    // 合并缓冲区
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
      buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }

    // 计算最终结果
    override def evaluate(buffer: Row): Any = {
      buffer.getLong(0).toDouble / buffer.getLong(1)
    }
  }

强类型 - DataSet

  • 通过继承 Aggregator[-IN,BUF,OUT] 来实现用户自定义聚合函数
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read.json("input/user1.json")
    // 封装为 DataSet
    val ds: Dataset[User] = df.as[User]

    // 创建聚合函数
    val myAvgUDAF = new MyAverageUDAF

    // 将聚合函数转换为查询的列
    val col: TypedColumn[User, Double] = myAvgUDAF.toColumn

    // 查询
    ds.select(col).show()

    spark.stop()
  }

  // 输入数据类型
  case class User(username: String, age: Long)

  // 缓存类型
  case class AvgBuf(var sum: Long, var count: Long)

  /**
   * 定义类继承 org.apache.spark.sql.expressions.Aggregator
   * 重写类中的方法
   */
  class MyAverageUDAF extends Aggregator[User, AvgBuf,Double] {

    // 缓冲区初始化
    override def zero: AvgBuf = {
      AvgBuf(0L, 0L)
    }

    // 更新缓冲区中的数据
    override def reduce(buffer: AvgBuf, user: User): AvgBuf = {
      buffer.sum = buffer.sum + user.age
      buffer.count = buffer.count + 1
      buffer
    }

    // 合并缓冲区
    override def merge(buffer1: AvgBuf, buffer2: AvgBuf): AvgBuf = {
      buffer1.sum += buffer2.sum
      buffer1.count += buffer2.count
      buffer1
    }

    // 计算最终结果
    override def finish(reduction: AvgBuf): Double = {
      reduction.sum.toDouble / reduction.count
    }

    // DataSet 默认的编解码器,用于序列化,固定写法
    // 自定义类型就是product,自带类型根据类型选择
    override def bufferEncoder: Encoder[AvgBuf] = Encoders.product

    override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  }

数据的加载和保存

通用的加载和保存方式

SparkSQL 提供了通用的保存数据和加载数据的方式。
这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据。
SparkSQL默认读取和保存的文件格式为Parquet

加载数据

spark.read.load("…") 是加载数据的通用方法
如果读取不同格式的数据,可以对不同的数据格式进行设定

scala> spark.read.format("…")[.option("…")].load("…")
  • format("…"):指定加载的数据类型。包括"csv",“dbc”,“json”,“orc”,“parquet”,“textFile”
  • load("…"):指定加载的数据的路径。
  • option("…"):在"jdbc"格式下需要传入JDBC相应参数。url,user,password,dbtable。

可以直接在文件上进行查询:文件格式.`文件路径`

scala>spark.sql("select * from json.`/opt/module/data/user.json`").show

保存数据

df.write.save("…") 是保存数据的通用方法
如果保存不同格式的数据,可以对不同的数据格式进行设定

scala>df.write.format("…")[.option("…")].save("…")
  • format("…"):指定加载的数据类型。包括"csv",“dbc”,“json”,“orc”,“parquet”,“textFile”
  • load("…"):指定加载的数据的路径。
  • option("…"):在"jdbc"格式下需要传入JDBC相应参数。url,user,password,dbtable。

保存操作可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置。
注意:这些SaveMode都是没有加锁的,也不是原子操作

SaveMode是一个枚举类,其中的常量包括:

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常
SaveMode.Append “append” 如果文件已经存在则追加
SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖
SaveMode.Ignore “ignore” 如果文件已经存在则忽略
df.write.mode("append").json("/opt/module/data/output")

Parquet

Spark SQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式
数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。

加载数据

scala> val df = spark.read.load("examples/src/main/resources/users.parquet")
scala> df.show

保存数据

scala> var df = spark.read.json("/opt/module/data/input/people.json")
//保存为parquet格式
scala> df.write.mode("append").save("/opt/module/data/output")

JSON

Spark SQL能够自动推测JSON数据集的结构,并将它加载为一个DataSet[Row]。可以通过spark.read.json("…")去加载JSON文件。

注意:Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。
格式如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

CSV

Spark SQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列

spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", 
"true").load("data/user.csv")

MySQL

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。

bin/spark-shell 
--jars mysql-connector-java-5.1.27-bin.jar
  • 在IDEA中通过JDBC对Mysql进行操作:

导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

读取数据

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    // 方式1:通用的load方法读取
    spark.read.format("jdbc")
      .option("url", "jdbc:mysql://hadoop131:3306/sparksql")
      .option("user", "root")
      .option("password", "saiyuan000")
      .option("dbtable", "user")
      .load().show()

    // 方法2:通用的load方法读取 参数另一种形式
    spark.read.format("jdbc")
      .options(Map(
        "url" -> "jdbc:mysql://hadoop131/sparksql?user=root&password=saiyuan000",
        "dbtable" -> "user",
        "driver" -> "com.mysql.jdbc.Driver"))
      .load().show()

    // 方法3:使用jdbc方法读取
    val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "saiyuan000")
    val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop131:3306/sparksql", "user", props)
    df.show()

    // 释放资源
    spark.stop()
  }

写入数据

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val rdd: RDD[User] = spark.sparkContext.makeRDD(
      List(
        User("dsy", 18),
        User("sarah", 19),
        User("dongsaiyuan", 20)
      )
    )
    val ds: Dataset[User] = rdd.toDS()

    // 方式1:通用的方法 format指定写出类型
    ds.write.format("jdbc")
        .option("url", "jdbc:mysql://hadoop131:3306/sparksql")
        .option("user", "root")
        .option("password", "saiyuan000")
        .option("dbtable", "user")
        .mode(SaveMode.Append)
        .save()

    // 方法2:通过jdbc方法
    val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "saiyuan000")
    ds.write.mode(SaveMode.Append)
        .jdbc("jdbc:mysql://hadoop131:3306/sparksql", "user", props)

    // 释放资源
    spark.stop()
  }

  case class User(name: String, age: Int)

Hive

Apache Hive是Hadoop上的SQL引擎,SparkSQL编译时可以包含Hive支持,也可以不包含。
包含Hive支持的Spark SQL可以支持Hive表访问,UDF(用户自定义函数),以及Hive查询语言(HiveQL/HQL)等。
注意:如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。因为Spark有内置的Hive。
若要把Spark SQL连接到一个部署好的Hive上,必须把hive-site.xml复制到Spark的配置文件目录中($SPARK_HOME/conf)。默认的文件系统是HDFS。
如果没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive元数据仓库,叫做metastore_db。本地文件系统。
如果使用HiveQL中的CREATE TABLE(并非 CRREATE EXTERNAL TABLE)语句来创建表,这些表会被放在默认的文件系统中的/user/hive/warehouse目录中。

注意:虽然Spark-shell默认是支持Hive的,但是由于当前使用Spark2.4.5版本,和Hive3.1.2版本不兼容,所以无法在对应版本的spark-shell中使用hive。未来版本会解决找个问题。

内嵌的HIVE

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.
Hive 的元数据存储在 derby 中, 仓库地址:$SPARK_HOME/spark-warehouse

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

scala> spark.sql("create table aa(id int)")

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|       aa|      false|
+--------+---------+-----------+
  • 向表加载本地数据
scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")

scala> spark.sql("select * from aa").show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+

在实际使用中, 几乎没有任何人会使用内置的 Hive

外部的HIVE

如果想连接外部已经部署好的Hive,需要通过以下几个步骤:

  • Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下
  • 把Mysql的驱动copy到jars/目录下
  • 如果访问不到hdfs,则需要把core-site.xml和hdfs-site.xml拷贝到conf/目录下
scala> spark.sql("show tables").show
20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning 
NoSuchObjectException
+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|                 emp|      false|
| default|hive_hbase_emp_table|      false|
| default| relevance_hbase_emp|      false|
| default|          staff_hive|      false|
| default|                 ttt|      false|
| default|   user_visit_action|      false|
+--------+--------------------+-----------+

运行Spark SQL CLI

Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。
在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口

bin/spark-sql

代码操作Hive

  • 导入依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>2.4.5</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>
  • 将hive-site.xml文件拷贝到项目的resources目录中,代码实现
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "dsy")
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UDAF StrongType Demo")
    val spark: SparkSession = SparkSession
      .builder()
      .enableHiveSupport()
      .config(conf)
      .config("spark.sql.warehouse.dir", "hdfs://hadoop131:9820/user/hive/warehouse")
      .getOrCreate()
    import spark.implicits._

    spark.sql("use default")
    spark.sql("create table user(name varchar(20), age int)")
    spark.sql("show tables").show()
    spark.sql("insert into table user values('dsy', 18)")
    spark.sql("insert into table user values('sarah', 19)")
    spark.sql("insert into table user values('dongsaiyuan', 20)")

    spark.stop()
  }

注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址:

config("spark.sql.warehouse.dir", "hdfs://hadoop131:9820/user/hive/warehouse")

SparkSQL 核心编程
可以代码最前面增加如下代码解决:

System.setProperty("HADOOP_USER_NAME", "dsy")

注意:如果在执行时出现连接异常,检查下hive的两个服务有没有开。

上一篇:大数据之sparksql常用函数


下一篇:SparkSQL读写JDBC