一、DataFrame
1、DataFrame是什么
DataFrame 是SparkSQL中一个表示关系型数据库中表的函数式抽象,其作用是让Spark 处理大规模结构化数据的时候更加容易.一般DataFrame可以处理结构化的数据,或者是半结构化的数据,因为这两类数据中都可以获取到Schema信息。也就是说DataFrame中有 Schema 信息,可以像操作表一样操作DataFrame .
DataFrame
由两部分构成, 一是 row
的集合, 每个 row
对象表示一个行, 二是描述 DataFrame
结构的 Schema
.
DataFrame
支持 SQL
中常见的操作, 例如: select
, filter
, join
, group
, sort
, join
等。
class DataFrame { @Test def dataFrame(): Unit ={ //1. 创建SparkSession val spark = SparkSession.builder() // 更简单的创建 Session 方式 .appName("dataFrame1").master("local[6]").getOrCreate() // 2. 创建 DataFrame import spark.implicits._ val dataFrame: sql.DataFrame = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDF() // 3. DataFrame 的一些操作 dataFrame.where('age > 15) .select('name).show() } } case class Person(name:String, age:Int)
2、数据处理
—般处理数据都差不多是ETL三个步骤:E->抽取,T->处理、转换,L->装载,落地
Spark代码编写的套路:
1、创建 DataFrame(或者Dataset、 RDD),制造或者读取数据
2、通过 DataFrame(或者Dataset、 RDD)的API来进行数据处理
3、通过 DataFrame(或者Dataset、 RDD)进行数据落地
DataFrame 如何创建?
有三种方式,toDF,createDataFrame,DataFrameReader。代码如下。
class DataFrame {/** * 介绍创建 DataFrame 的三种方法 */ @Test def createDF(): Unit ={ //1. 创建SparkSession val spark = SparkSession.builder() // 更简单的创建 Session 方式 .appName("dataFrame1").master("local[6]").getOrCreate() // 2. 创建 DataFrame import spark.implicits._ val personList = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)) // (1) toDF val df1 = personList.toDF() // 作用于普通序列 val df2 = spark.sparkContext.parallelize(personList).toDF() // 作用于 RDD 对象 // (2) createDataFrame val df3 = spark.createDataFrame(personList) // (3) read val df4 = spark.read.csv("../dataset/BeijingPM20100101_20151231_noheader.csv") // 运行展示一下read csv 的结果 df4.show() } }
DataFrame支持什么操作?
我们通过一个案例入门DataFrame 的操作,需求是查看北京雾霾数据集上每个月的统计数量。
方法一:命令式
@Test def caseDF(): Unit ={ // 1. 创建 SparkSession val spark = SparkSession.builder().master("local[6]").appName("Case_Beijing").getOrCreate() import spark.implicits._ // 2. 读取数据集 val sourceDF = spark.read .option("header", value = true) // 设置头信息,默认数据集中的第一行 .csv("../dataset/BeijingPM20100101_20151231.csv") sourceDF.show() // 3. 处理数据集(与处理 RDD 时的思路完全不同) // 3.1 选择列 sourceDF.select('year, 'month, 'PM_Dongsi) // 3.2 过滤到空(NA)的PM记录 .where('PM_Dongsi =!= "NA") // 3.3 分组 select year, month, count(PM_Dongsi) from ... where PM_Dongsi != 'NA' group by year, month .groupBy('year, 'month) // 3.4 聚合 .count() // 得出结论 .show() spark.stop() }
方法二:直接使用sql
@Test def case_sql(): Unit ={ // 1. 创建 SparkSession val spark = SparkSession.builder().master("local[6]").appName("Case_Beijing").getOrCreate() import spark.implicits._ // 2. 读取数据集 val sourceDF = spark.read .option("header", value = true) // 设置头信息,默认数据集中的第一行 .csv("../dataset/BeijingPM20100101_20151231.csv") sourceDF.show() // 3. 直接使用 sql 进行查询 // 3.1 将 DataFrame 注册为临表 sourceDF.createOrReplaceTempView("pm") // 3.2 执行查询 val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year, month") resultDF.show() spark.stop() }
3、总结
-
DataFrame
是一个类似于关系型数据库表的函数式组件 -
DataFrame
一般处理结构化数据和半结构化数据 -
DataFrame
具有数据对象的 Schema 信息 -
可以使用命令式的
API
操作DataFrame
, 同时也可以使用SQL
操作DataFrame
-
DataFrame
可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建
二、DataFrame 与 Dataset 的异同
1、DataFrame 就是 Dataset
根据前面的内容,可以得到如下信息
1. Dataset中可以使用列来访问数据,DataFrame也可以
2. Dataset 的执行是优化的,DataFrame 也是
3.Dataset 具有命令式 API ,同时也可以使用SQL来访问, DataFrame也可以使用这两种不同的方式访问
那么为什么两个相同的东西会同时出现在 Spark SQL 中呢?我们看这二者在Scala源码中的表示。
确实,这两个组件是同一个东西, DataFrame 是 Dataset 的一种特殊情况,也就是说 DataFrame 是 Dataset[Row] 的别名
2、DataFrame 与 Dataset 表达的语义不同
第一点: DataFrame
表达的含义是一个支持函数式操作的 表
, 而 Dataset
表达是是一个类似 RDD
的东西, Dataset
可以处理任何对象
- 第二点:
DataFrame
中所存放的是Row
对象, 而Dataset
中可以存放任何类型的对象
val spark = SparkSession.builder().appName("df1").master("local[6]").getOrCreate() import spark.implicits._ val df: sql.DataFrame = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDF() val ds: sql.Dataset[Person] = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)).toDS()
- 可知,DataFrame是弱类型,Dataset是强类型。
- DataFrame就是Dataset [ row ] ,Dataset的泛型可以是任意类型。
- 第三点:
DataFrame
的操作方式和Dataset
是一样的, 但是对于强类型操作而言, 它们处理的类型不同
@Test def dataFrame2(): Unit ={ val spark = SparkSession.builder().appName("df1").master("local[6]").getOrCreate() val personList = Seq(Person("zhangsan", 14), Person("lisi", 16), Person("wangwu", 25)) import spark.implicits._ val df: sql.DataFrame = personList.toDF() // 首先通过最后的解码器,设置程序按照 df 的 schema结构,然后row对象 => Row(row.get(0), row.getAs[Int](1) * 2) // 即第一位不变,第二位强转为Int然后 * 2, 这里取出对象中的元素用 get 方法, 强转用 getAs df.map( (row:Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema)) .show() val ds = personList.toDS() // 这里取出对象元素用 person.name, 即 对象.属性名 ds.map( (person: Person) => Person(person.name, person.age * 2) ) .show() }
- 第四点:
DataFrame
只能做到运行时类型检查,Dataset
能做到编译和运行时都有类型检查
3、Row是什么?
Row
对象表示的是一个 行
Row
的操作类似于 Scala
中的 Map
数据类型
@Test def row(): Unit ={ // 1、 row 如何创建,它是什么 val p = Person("zhangsan",19) val row = Row("zhangsan",19) // 两者区别,对于p,列是有名字的,name, age // 对于 row, 列是没有名字的,若想有列名,必须配合schema // 2、如何从 row 中获取数据 row.getString(0) row.getInt(1) // Row 也是样例类 row match { case Row(name, age) => println(name, age) } }
4、DataFrame 和 Dataset 之间的相互转换
val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate() import spark.implicits._ val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF() val ds_fdf: Dataset[People] = df.as[People] val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS() val df_fds: DataFrame = ds.toDF()
5、总结
-
DataFrame
就是Dataset
, 他们的方式是一样的, 也都支持API
和SQL
两种操作方式 -
DataFrame
只能通过表达式的形式, 或者列的形式来访问数据, 只有Dataset
支持针对于整个对象的操作 -
DataFrame
中的数据表示为Row
, 是一个行的概念