Spark08-SparkSQL之DataFrame

一、DataFrame

1、DataFrame是什么

DataFrame 是SparkSQL中一个表示关系型数据库中表的函数式抽象,其作用是让Spark 处理大规模结构化数据的时候更加容易.一般DataFrame可以处理结构化的数据,或者是半结构化的数据,因为这两类数据中都可以获取到Schema信息。也就是说DataFrame中有 Schema 信息,可以像操作表一样操作DataFrame .

DataFrame 由两部分构成, 一是 row 的集合, 每个 row 对象表示一个行, 二是描述 DataFrame 结构的 Schema.

Spark08-SparkSQL之DataFrame

DataFrame 支持 SQL 中常见的操作, 例如: selectfilterjoingroupsortjoin 等。

class DataFrame {
  @Test
  def dataFrame(): Unit ={
    //1. 创建SparkSession
    val spark = SparkSession.builder() // 更简单的创建 Session 方式
      .appName("dataFrame1").master("local[6]").getOrCreate()

    // 2. 创建 DataFrame
    import spark.implicits._
    val dataFrame: sql.DataFrame = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDF()

    // 3. DataFrame 的一些操作
    dataFrame.where('age > 15)
      .select('name).show()
    }

}
case class Person(name:String, age:Int)

2、数据处理

—般处理数据都差不多是ETL三个步骤:E->抽取,T->处理、转换,L->装载,落地

Spark代码编写的套路:

  1、创建 DataFrame(或者Dataset、 RDD),制造或者读取数据

  2、通过 DataFrame(或者Dataset、 RDD)的API来进行数据处理

  3、通过 DataFrame(或者Dataset、 RDD)进行数据落地

DataFrame 如何创建?

有三种方式,toDF,createDataFrame,DataFrameReader。代码如下。

class DataFrame {/**
   * 介绍创建 DataFrame 的三种方法
   */
  @Test
  def createDF(): Unit ={
    //1. 创建SparkSession
    val spark = SparkSession.builder() // 更简单的创建 Session 方式
      .appName("dataFrame1").master("local[6]").getOrCreate()

    // 2. 创建 DataFrame
    import spark.implicits._

    val personList = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25))
    // (1) toDF
    val df1 = personList.toDF() // 作用于普通序列
    val df2 = spark.sparkContext.parallelize(personList).toDF() // 作用于 RDD 对象

    // (2) createDataFrame
    val df3 = spark.createDataFrame(personList)

    // (3) read
    val df4 = spark.read.csv("../dataset/BeijingPM20100101_20151231_noheader.csv")

    // 运行展示一下read csv 的结果
    df4.show()
  }
}

DataFrame支持什么操作?

我们通过一个案例入门DataFrame 的操作,需求是查看北京雾霾数据集上每个月的统计数量。

方法一:命令式

  @Test
  def caseDF(): Unit ={
    // 1. 创建 SparkSession
    val spark = SparkSession.builder().master("local[6]").appName("Case_Beijing").getOrCreate()

    import spark.implicits._

    // 2. 读取数据集
    val sourceDF = spark.read
      .option("header", value = true) // 设置头信息,默认数据集中的第一行
      .csv("../dataset/BeijingPM20100101_20151231.csv")
    sourceDF.show()

    // 3. 处理数据集(与处理 RDD 时的思路完全不同)
    // 3.1 选择列
    sourceDF.select('year, 'month, 'PM_Dongsi)

    // 3.2 过滤到空(NA)的PM记录
      .where('PM_Dongsi =!= "NA")

    // 3.3 分组 select year, month, count(PM_Dongsi) from ... where PM_Dongsi != 'NA' group by year, month
      .groupBy('year, 'month)

    // 3.4 聚合
      .count()

      // 得出结论
      .show()

    spark.stop()
  }

方法二:直接使用sql

@Test
  def case_sql(): Unit ={
    // 1. 创建 SparkSession
    val spark = SparkSession.builder().master("local[6]").appName("Case_Beijing").getOrCreate()

    import spark.implicits._

    // 2. 读取数据集
    val sourceDF = spark.read
      .option("header", value = true) // 设置头信息,默认数据集中的第一行
      .csv("../dataset/BeijingPM20100101_20151231.csv")
    sourceDF.show()

    // 3. 直接使用 sql 进行查询
    // 3.1 将 DataFrame 注册为临表
    sourceDF.createOrReplaceTempView("pm")

    // 3.2 执行查询
    val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year, month")

    resultDF.show()
    spark.stop()
  }

3、总结

  1. DataFrame 是一个类似于关系型数据库表的函数式组件

  2. DataFrame 一般处理结构化数据和半结构化数据

  3. DataFrame 具有数据对象的 Schema 信息

  4. 可以使用命令式的 API 操作 DataFrame, 同时也可以使用 SQL 操作 DataFrame

  5. DataFrame 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建

二、DataFrame 与 Dataset 的异同

1、DataFrame 就是 Dataset

根据前面的内容,可以得到如下信息

1. Dataset中可以使用列来访问数据,DataFrame也可以

2. Dataset 的执行是优化的,DataFrame 也是

3.Dataset 具有命令式 API ,同时也可以使用SQL来访问, DataFrame也可以使用这两种不同的方式访问

那么为什么两个相同的东西会同时出现在 Spark SQL 中呢?我们看这二者在Scala源码中的表示。

 Spark08-SparkSQL之DataFrame

确实,这两个组件是同一个东西, DataFrame 是 Dataset 的一种特殊情况,也就是说 DataFrame 是 Dataset[Row] 的别名

2、DataFrame 与 Dataset 表达的语义不同

第一点: DataFrame 表达的含义是一个支持函数式操作的 , 而 Dataset 表达是是一个类似 RDD 的东西, Dataset 可以处理任何对象

第二点: DataFrame 中所存放的是 Row 对象, 而 Dataset 中可以存放任何类型的对象
    val spark = SparkSession.builder().appName("df1").master("local[6]").getOrCreate()
    import spark.implicits._
    val df: sql.DataFrame = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDF()
    val ds: sql.Dataset[Person] = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDS()
可知,DataFrame是弱类型,Dataset是强类型。
DataFrame就是Dataset [ row ] ,Dataset的泛型可以是任意类型。
第三点: DataFrame 的操作方式和 Dataset 是一样的, 但是对于强类型操作而言, 它们处理的类型不同
  @Test
  def dataFrame2(): Unit ={
    val spark = SparkSession.builder().appName("df1").master("local[6]").getOrCreate()
    val personList = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25))

    import spark.implicits._

    val df: sql.DataFrame = personList.toDF()
    // 首先通过最后的解码器,设置程序按照 df 的 schema结构,然后row对象 => Row(row.get(0), row.getAs[Int](1) * 2)
    // 即第一位不变,第二位强转为Int然后 * 2, 这里取出对象中的元素用 get 方法, 强转用 getAs
    df.map( (row:Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema))
      .show()

    val ds = personList.toDS()
    // 这里取出对象元素用 person.name, 即 对象.属性名
    ds.map( (person: Person) => Person(person.name, person.age * 2) )
      .show()
  }
第四点: DataFrame 只能做到运行时类型检查, Dataset 能做到编译和运行时都有类型检查

 3、Row是什么?

Row 对象表示的是一个

Row 的操作类似于 Scala 中的 Map 数据类型

  @Test
  def row(): Unit ={
    // 1、 row 如何创建,它是什么
    val p = Person("zhangsan",19)
    val row = Row("zhangsan",19)
    // 两者区别,对于p,列是有名字的,name, age
    // 对于 row, 列是没有名字的,若想有列名,必须配合schema

    // 2、如何从  row 中获取数据
    row.getString(0)
    row.getInt(1)

    // Row 也是样例类
    row match {
      case Row(name, age) => println(name, age)
    }
  }

 4、DataFrame 和 Dataset 之间的相互转换

val spark: SparkSession = new sql.SparkSession.Builder()
  .appName("hello")
  .master("local[6]")
  .getOrCreate()

import spark.implicits._

val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()
val ds_fdf: Dataset[People] = df.as[People]

val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
val df_fds: DataFrame = ds.toDF()

5、总结

  1. DataFrame 就是 Dataset, 他们的方式是一样的, 也都支持 API 和 SQL 两种操作方式

  2. DataFrame 只能通过表达式的形式, 或者列的形式来访问数据, 只有 Dataset 支持针对于整个对象的操作

  3. DataFrame 中的数据表示为 Row, 是一个行的概念

上一篇:SparkSQL(一)——概述


下一篇:SparkSQL在字节跳动的应用实践和优化实战