1、SparkSql概述
1、什么是SparkSql?
SparkSql用于处理结构化数据,底层还是RDD
2、SparkSql的两个数据抽象: DataFrame、DataSet
1、什么是DataFrame
DataFrame可以当做一个二维表格,有schema信息<有列名、列类型>
DataFrame只关注列不关注行的类型,不管每个元素<每行>是什么类型,表现出来都是Row类型
2、什么是DataSet
DataSet可以当做一个二维表格,有schema信息<有列名、列类型>
DataSet即关注列也关注行的类型,每个的数据类型是啥,表现出来就是啥
3、DataFrame与DataSet的区别:
1、DataFrame是弱类型,DataSet是强类型
2、DataFrame是运行期安全,编译器不安全。DataSet是编译器安全,运行期也安全
4、DataFrame与DataSet的使用时机:
1、如果是将rdd转成sparksql编程,
此时如果rdd里面的元素类型是样例类,转成DataSet或者DataFrame都可以
此时如果rdd里面的元素类型元组,推荐转成DataFrame,可以通过toDF指定列名
2、如果想要使用map、flatMap这种写函数的强类型算子,推荐使用DataSet
5、RDD、DataFrame、DataSet的联系
1、RDD、DataFrame、DataSet都是弹性分布式数据集
2、RDD、DataFrame、DataSet都是惰性执行的,都需要调用action算子之后才会真正执行
3、RDD、DataFrame、DataSet都有分区
4、RDD、DataFrame、DataSet有很多共同的函数: map、flatMap、filter..
5、RDD、DataFrame、DataSet都是数据在内存与磁盘中动态存储
2、SparkSql编程
1、创建SparkSession: SparkSession.builder().master("").appName(..).getOrCreate()
2、DataFrame创建:
1、通过toDF方法
要想使用toDF方法必须导入隐式转换: import sparksession对象名.implicits._
1、集合.toDF()
2、rdd.toDF()
toDF有两个重载的方式,如果调用的是无参的toDF,此时会生成默认的列名<如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>
所以如果元素是元组,可以有参的toDF方法指定列名<指定的列名的个数必须与列的个数要相同>
2、通过读取文件: spark.read.csv/json/jdbc..
3、通过其他DataFrame衍生
3、DataSet创建
1、通过toDS方法
要想使用toDS方法必须导入隐式转换: import sparksession对象名.implicits._
1、集合.toDS()
2、rdd.toDS()
toDS方法生成的DataSet此时会生成默认的列名<如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>
2、通过读取文件: spark.read.textFile()
3、通过其他DataFrame衍生
4、SparkSql编程的两种方式:
1、SQL风格
1、将df/ds注册成表:
createTempView:: 注册成临时表
createOrReplaceTempView: 注册成临时表[如果表已经存在会替换],只能在当前SparkSession中使用,后续只在使用表的时候直接用 表名 既可以
createGlobalTempView:注册成全局表
createOrReplaceGlobalTempView: 注册成全局表,可以在多个sparkSession中使用,后续在使用的时候,必须通过 global_temp.表名 的方式使用
2、sql编写: spark.sql("sql语句")
2、DSL风格: 使用select、filter、where、groupBy等api变成
常用的DSL api:
1、过滤:
1、filter("过滤条件") // filter("age>20")
2、where("过滤条件") // where("age>20")
2、去重:
1、distinct: 只有所有列都相同才会去重
2、dropDuplicates: 当指定列相同的时候就会去重
3、列裁剪: selectExpr("字段名","函数(字段名)","字段名 as 别名")
5、RDD、DataFrame、DataSet转换
1、RDD转DataFrame: rdd.toDF/rdd.toDF(列名,列名,..)
2、DataFrame转rdd: df.rdd
3、Rdd转DataSet: rdd.toDS
4、DataSet转rdd: val rdd:RDD[DataSet元素类型] = ds.rdd
5、DataFrame转DataSet: val ds:DataSet[类型] = df.as[类型]
DataFrame转DataSet的时候,
如果as后面的类型是样例类,需要样例类的属性名要与列名一致。
如果as后面的类型是元组,需要元组的个数 = 列的个数,类型也要一致
6、DataSet转DataFrame: ds.toDF/ds.toDF(列名,列名,..)
6、Row类型的取值: row.getAs[ 列的类型 ] ( "列名" )
7、自定义函数:
1、自定义UDF函数:
1、定义普通函数
val func = (id:Int) => id+"-001"
2、注册udf函数: spark.udf.register("函数名",函数)
spark.udf.register("myfunc",func)
3、通过sql使用函数:
spark.sql("select myfunc(id) from 表名")
2、自定义udaf函数
1、弱类型udaf:
1、定义class继承UserDefinedAggregateFunction
2、重写抽象方法
def inputSchema: StructType <定义udaf参数类型>
def bufferSchema: StructType <定义中间变量的参数类型>
def dataType: DataType <定义最终结果类型>
def deterministic: Boolean <一致性>
def initialize(buffer: MutableAggregationBuffer): Unit <初始化中间变量>
def update(buffer: MutableAggregationBuffer, input: Row): Unit <每次传入组中一个值,更新中间变量>
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit <合并所有task的统计结果>
def evaluate(buffer: Row): Any <获取最终结果>
3、注册udaf:
1、创建自定义udaf对象: val obj = new xxx
2、注册: spark.udf.register("函数名",obj)
2、强类型的udaf
1、定义class继承Aggregator[IN,BUF,OUT]
IN: udaf参数类型
BUF: 中间变量类型
OUT: 最终结果类型
2、重写方法
def zero: Buff <中间变量赋初始值>
def reduce(buff: Buff, age: Int): Buff <在每个分区中先预聚合,每个传入一个元素,更新中间结果>
def merge(b1: Buff, b2: Buff): Buff <对所有分区的结果再次聚合>
def finish(reduction: Buff): Double <获取最终结果>
def bufferEncoder: Encoder[Buff] <对中间结果类型编码>
def outputEncoder: Encoder[Double] <对最终结果类型编码>
3、注册
1、创建udaf对象: val obj = new XXX
2、导入隐式转换,使用udaf函数:
import org.apache.spark.sql.functions._
val uobj = udaf(obj)
3、注册: spark.udf.register("函数名",uobj)
3、数据读取与保存
1、读取
1、文件读取:
1、spark.read
.format() --指定文件读取格式[csv/json/text/parquet/orc]
.option().option().. --指定读取的参数
.load(path) --指定加载路径的数据
在读取文件的时候,一般只有csv文件才需要配置option,csv文件常用的option:
sep: 指定字段之间的分隔符
header: 指定是否以文件的第一行作为列名
inferSchema: 指定是否自动推断字段的类型
2、spark.read[.option()].csv/json/csv/parquet/orc
2、mysql数据读取
1、spark.read
.format() --指定文件读取格式[jdbc]
.option().option().. --指定读取的参数<账号、密码、driver、表、url>
.load() --指定加载路径的数据
2、spark.read.jdbc(url,表名,参数设置): 此种方式读取jdbc的时候分区数 = 1,只能用于数据量小的场景
spark.read.jdbc(url,表名,分区条件参数,参数设置): 此种方式读取jdbc的时候分区数=分区条件参数数组的元素个数。<不常用>
val arr = Array("age<20","age>=20 and age<40","age>=40")
spark.read.jdbc("jdbc:mysql://xx:3306/test","person",arr,参数设置)
spark.read.jdbc(url,表名,mysql字段名,lowerBound,uperBound,分区数,参数设置): <工作常用>
此种方式读取的时候,分区数 = (uperBound-lowerBound) > 分区数 ? 分区数 : uperBound-lowerBound
2、数据保存:
1、df/ds.write
.mode() --指定写入模式
.format() --指定数据写入的格式[csv/json/parquet/orc/jdbc]
.option() --指定数据写入的时候需要的参数
csv文件写入的时候指定的option:
header: 写入的时候是否将列名也写入
sep: 写入的时候指定字段之间的分隔符
.save() --数据保存
2、df/ds.write.mode(..).csv/json/parquet/orc/jdbc
常用的写入模式:
SaveMode.Overwrite: 如果指定的路径/表已经存在,则覆盖历史数据<数据写入HDFS的时候使用>
SaveMode.Append: 如果指定的路径/表已经存在,则追加数据<数据写入mysql的时候使用>
如果写入mysql的时候,主键数据已经存在,此时不能使用append,需要通过foreachPartitions对数据进行更新写入
3、hive的数据读取和保存
1、读取数据:
1、在创建SparkSession的时候通过enableHiveSupport需要开启hive的支持:
SparkSession.builder().master(..).appName(..).enableHiveSupport().getOrCreate
2、直接在代码中通过spark.sql("查询hive表数据")
2、保存数据到hive表:
df/ds.write.mode(..).saveAsTable("hive表") <不常用,一般都是将数据写入HDFS>
4、多维聚合:<对多个维度按照不同的组合进行聚合>
grouping sets:
语法: select 维度1,维度2,..,聚合函数 from 表 group by 维度1,维度2,.. grouping sets( (维度1),(维度1,维度2),(..) )
grouping sets后面的字段名必须是group by后面的字段
案例:
select A,B,C,count(1) from person group by A,B,C grouping sets( (A),(A,B),(B,C),(A,B,C) )
等价于:
select A,null B,null C,count(1) from person group by A
union
select A,B,null C,count(1) from person group by A,B
union
select null A,B,C,count(1) from person group by B,C
union
select A,B,C,count(1) from person group by A,B,C