Transformation
Untyped API
前面一篇写了SELECT这个API的一些主要用法,本文打算通过一个SQL示例来展开讲述下Dataset其他常用的Untyped API,比如filter(类似于where),join,groupby 等等。
首先,描述下大概的需求:
表一,描述了一家五口人,有名字,年龄和性别,先在mysql中建表,并且插入数据
create table if not exists family_name(name varchar(10),age int, sex varchar(6));
INSERT INTO family_name
VALUES
('Brad',32,'male')
,('Anne',31,'female')
,('Eason',4,'male')
,('John',66,'male')
,('Annie',60,'female')
;
1
2
3
4
5
6
7
8
9
表二,描述了这一家人在2018年第一周的花费情况,建表以及插入数据语句如下
create table if not exists family_consume(log_date date,name varchar(10),amount decimal(18,2));
INSERT INTO family_consume
VALUES
('2018-01-01','Brad',12)
,('2018-01-01','Anne',32)
,('2018-01-01','John',22)
,('2018-01-01','Annie',12)
,('2018-01-01','Eason',421)
,('2018-01-02','Brad',23)
,('2018-01-02','Anne',44)
,('2018-01-02','John',12)
,('2018-01-02','Annie',54)
,('2018-01-02','Eason',31)
,('2018-01-03','Brad',34)
,('2018-01-03','Anne',88)
,('2018-01-03','John',12)
,('2018-01-03','Annie',1)
,('2018-01-03','Eason',0)
,('2018-01-04','Brad',44)
,('2018-01-04','Anne',231)
,('2018-01-04','John',12)
,('2018-01-04','Annie',56)
,('2018-01-04','Eason',210)
,('2018-01-05','Brad',67)
,('2018-01-05','Anne',432)
,('2018-01-05','John',1)
,('2018-01-05','Annie',32)
,('2018-01-05','Eason',43)
,('2018-01-06','Brad',43)
,('2018-01-06','Anne',12)
,('2018-01-06','John',0)
,('2018-01-06','Annie',0)
,('2018-01-06','Eason',45)
,('2018-01-07','Brad',54)
,('2018-01-07','Anne',132)
,('2018-01-07','John',122)
,('2018-01-07','Annie',29)
,('2018-01-07','Eason',170)
;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
现在的需求是,要求计算出这一家人年龄在65岁以下,在第一周花费前三的名字,年龄,性别(需要做相应转换),以及相应的消费金额
SQL语句如下(本文不讨论SQL语法以及如何满足需求,放SQL只是为了对照后面的DataSet的API)
select a.name
,a.age
,case when a.sex = 'male' then '男'
else '女' end as sex
,sum(b.amount) amount
from family_name a
join family_consume b
on a.name = b.name
where age < 65
group by a.name
,a.age
,case when a.sex = 'male' then '男'
else '女' end
order by amount desc
limit 3;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
结果如下:
| name | age | sex | amount |
+-------+------+------+--------+
| Anne | 31 | 女 | 971.00 |
| Eason | 4 | 男 | 920.00 |
| Brad | 32 | 男 | 277.00 |
1
2
3
4
5
(顺便吐槽下,可以看出,一家人果然是女人和孩子花费最大)
以上为Mysql部分,现在开始讲述如何通过DataSet/DataFrame来实现。
首先导入数据(先在hdfs上面准备csv数据文件),本文不讨论导入数据源明细,会在其他文章中讨论:
val family_name = spark.read.option("quote","\'").option("inferSchema","true").csv("hdfs://master:9999/user/hadoop-twq/spark-course/sql/family_name.csv").toDF("name","age","sex")
val family_consume = spark.read.option("quote","\'").option("inferSchema","true").csv("hdfs://master:9999/user/hadoop-twq/spark-course/sql/family_consume.csv").toDF("log_date","name","amount")
1
2
下面主要说一下前面SQL语句里面用到的关键字对应的API
过滤条件 filter => where
表别名 as
join语句
group by语句
聚合函数sum
case when 语句
排序order by语句
先看下如何筛选出来65岁以下的人
family_name.filter($"age" < 65).show()
1
就是filter后面加上筛选条件即可,上面是表达式类型的用法,换成SQL类型的,可以写为
family_name.filter("age < 65" ).show()
1
其次看下怎么关联两个Dataset,首先给DF取别名跟SQL中一样,也是使用as
而join的写法跟SQL一样,有两种
val family_lower_age = family_name.as("a").filter("age < 65").join(family_consume.as("b")).where($"a.name" === $"b.name").show()
1
上面这种类似于如下SQL语句
SELECT *
FROM family_name a
,family_consume b
WHERE a.age < 65
AND a.name = b.name
1
2
3
4
5
注意,其中的$“a.name” === $"b.name"使用表达式方式的时候,中间是3个等号。
第二种方法如下
val family_lower_age = family_name.filter("age < 65").join(family_consume,"name")
1
这个类似于SQL语句中使用JOIN 关键字,再使用ON连接,下面就不再累赘的贴出语句了。
然后再看下实现group by和sum语句,DataSet里面使用的是groupBy这个API,如下所示(使用上面生成的family_lower_age这个DataFrame)。
val family_group_sum = family_lower_age.groupBy($"name",$"age").sum("amount")
family_group_sum.show
| name|age|sum(amount)|
+-----+---+-----------+
| Brad| 32| 277|
|Eason| 4| 920|
| Anne| 31| 971|
|Annie| 60| 184|
1
2
3
4
5
6
7
8
9
可以看出已经生成了接近最终的结果,此处另外说明一个agg API,可以同时计算多个聚合函数,以及可以方便的重命名,比如上面除了计算sum(amount)外,还要计算最大值和最小值,则可以使用下面方式计算
val family_group_sum = family_lower_age
.groupBy($"name",$"age")
.agg(sum("amount") as "sum_amount"
,max("amount") as "max_amount"
, min("amount") as "min_amount")
| name|age|sum_amount|max_amount|min_amount|
+-----+---+----------+----------+----------+
| Brad| 32| 277| 67| 12|
|Eason| 4| 920| 421| 0|
| Anne| 31| 971| 432| 12|
|Annie| 60| 184| 56| 0|
1
2
3
4
5
6
7
8
9
10
11
12
上面可以看出,使用agg可以方便的计算出多个不同聚合值,并且可以方便的重命名计算后的列明,注意,agg里面重命名的时候,列名也需要加双引号""。
再后面看下case when语句,该语句是内嵌在select API中的when语句,如下:
family_lower_age
.select($"name",$"age",when($"sex" === "male","男")
.otherwise("女") as "性别")
.distinct()
.limit(5)
.show()
1
2
3
4
5
6
结果如下
| name|age| 性别|
+-----+---+---+
|Eason| 4| 男|
| Anne| 31| 女|
|Annie| 60| 女|
| Brad| 32| 男|
1
2
3
4
5
6
上面是在一个select 中选择了多个列,只是其中的sex列使用了case when语句做了一下转换。为了结果展示方便,这里还分别使用了distinct 以及 limit(5)两个API
结合之前所有的代码,可一句写出所有的需要所要的结果
family_name
.filter($"age" < 65) //过滤掉65岁以上的记录
.join(family_consume,"name") //关联family_consume,关联条件为name
.select($"name",$"age",when($"sex" === "male","男").othrwise("女") as "sex",$"amount") //选出需要的列,并且使用case when语句转换性别为中文
.groupBy($"name",$"age",$"sex") //group by name, age ,sex
.sum("amount") //sum(amount)
.withColumnRenamed("sum(amount)","amount") //重命名
.orerBy($"amount".desc) //倒顺序
.limit(3) //前三的记录
.show()
1
2
3
4
5
6
7
8
9
10
上述的语句在spark-shell中运行时,记得不能使用**.**开头,需要合并成一个语句,否则会报错。
相较于SQL,DataFrame因为使用编程的方式,可以更加方便的调试。比如,如果对最终结果有疑问,可以在中间任何地方进行调试。因为除了最终的show是action,其他都是transformation,并不会消耗内存和计算机性能。
下文开始讲述TypedAPI
————————————————
版权声明:本文为CSDN博主「Brad_Q1」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Brad_Q1/article/details/87529122