Hive函数、开窗函数、UDF、UDTF
1、系统内置函数
- 查看系统自带的函数
hive> show functions;
- 显示自带的函数的用法
//查看upper函数的用法 hive> desc function upper;
- 详细显示自带的函数的用法
//查看upper函数的详细用法 hive> desc function extended upper;
2、常用函数
2.1、关系运算
// 等值比较
= == <=>
// 不等值比较
!= <>
// 区间比较:
select * from students where id between 1500100001 and 1500100010;
// 空值/非空值判断:
is null、is not null、nvl()、isnull()
//如果 value 为 NULL,则 NVL 函数返回 default_value 的值,否则返回 value 的值,如果两个参数都为 NULL ,则返回 NULL。
NVL(value,default_value)
//
like、rlike、regexp用法
2.2、数值运算
随机一个0到1之间的数:rand()
取整函数(四舍五入):round()
向上取整:ceil()
向下取整:floor()
2.3、条件函数
//if(表达式,如果表达式成立的返回值,如果表达式不成立的返回值)
select if(1>0,1,0); //1
select if(1>0,if(-1>0,-1,1),0);//1
//coalesce返回第一个不为null的值
select COALESCE(null,'1','2'); // 1 从左往右 一次匹配 直到非空为止
select COALESCE('1',null,'2'); // 1
//case when
select score
,case when score>120 then '优秀'
when score>100 then '良好'
when score>90 then '及格'
else '不及格'
end as level
from score limit 20;
select name
,case name when "施笑槐" then "槐ge"
when "吕金鹏" then "鹏ge"
when "单乐蕊" then "蕊jie"
else "算了不叫了"
end as nickname
from students limit 10;
2.4、日期函数
//将时间戳转换成指定格式的日期
select from_unixtime(1610611142,'YYYY/MM/dd HH:mm:ss');
//unix_timestamp()获取当前日期的时间戳,并转换成指定格式
select from_unixtime(unix_timestamp(),'YYYY/MM/dd HH:mm:ss');
// '2021年01月14日' -> '2021-01-14'
select from_unixtime(unix_timestamp('2021年01月14日','yyyy年MM月dd日'),'yyyy-MM-dd');
//将指定日期格式化成指定格式2021/11/28
select date_format('2021-11-28', 'yyyy/MM/dd');
//时间加5天数
select date_add('2021-11-28',5);
//时间减5天数
select date_sub('2021-11-28',5);
//时间相减
select datediff('2021-11-28','2021-11-20');
2.5、字符串函数
//字符串连接,如果遇到有一个值为null结果将为null
concat('123','456'); // 123456
concat('123','456',null); // NULL
//指定分隔符连接,并且忽略null
select concat_ws('#','a','b','c'); // a#b#c
select concat_ws('#','a','b','c',NULL); // a#b#c 可以指定分隔符,并且会自动忽略NULL
select concat_ws("|",cast(id as string),name,cast(age as string),gender,clazz) from students limit 10;
//字符串截取,位置从1开始,起始位置也可指定负数
select substring("abcdefg",1); // abcdefg HQL中涉及到位置的时候 是从1开始计数
select substring('helloworld', -5); //world
//从第六个位置开始,截取五个字符
select substring('helloworld', 6, 5); //world
// '2021/01/14' -> '2021-01-14'
select concat_ws("-",substring('2021/01/14',1,4),substring('2021/01/14',6,2),substring('2021/01/14',9,2));
//字符串切分
select split("abcde,fgh",","); // ["abcde","fgh"]
select split("a,b,c,d,e,f",",")[2]; // c
//按照,切分并且将列转行
select explode(split("abcde,fgh",",")); // abcde
// fgh
// 解析json格式的数据
// 格式:get_json_object(json字符串,$表示根)
select get_json_object('{"name":"zhangsan","age":18,"score":[{"course_name":"math","score":100},{"course_name":"english","score":60}]}',"$.score[0].score"); // 100
2.6、Hive求WordCount
create table words(
words string
)row format delimited fields terminated by '|';
// 数据
hello,java,hello,java,scala,python
hbase,hadoop,hadoop,hdfs,hive,hive
hbase,hadoop,hadoop,hdfs,hive,hive
select word,count(*) from (select explode(split(words,',')) word from words) a group by a.word;
// 结果
hadoop 4
hbase 2
hdfs 2
hello 2
hive 4
java 2
python 1
scala 1
3、列转行
lateral view
:
用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
lateral view explode
它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
原始数据
姓名 语文,数学,英语
zs 80,88,90
ls 95,81,81
目标结果
zs 80 //语文
zs 88 //数学
zs 90 //英语
ls 95 //语文
ls 81 //数学
ls 81 //英语
步骤
//创建表,并导入数据
create table stu_score(
name string,
score string
)
row format delimited fields terminated by '\t';
//查询语句
//首先将score列按,进行切分,再将其拆分成多行
select name, sco from stu_score
lateral view explode(split(score,',')) t1 as sco;
4、行转列
使用的函数:concat()
concat_ws()
上面两个函数上面字符串函数有介绍
collect_set(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生array类型字段
原始数据
name province city
张三 安徽 合肥
李四 安徽 蚌埠
王五 江苏 南京
赵六 江苏 苏州
小明 安徽 合肥
小红 江苏 南京
目标结果
安徽,合肥 张三|小明
安徽,蚌埠 李四
江苏,南京 王五|小红
江苏,苏州 赵六
步骤
//建表语句
create table person(
name string,
province string,
city string
)
row format delimited fields terminated by '\t';
//加载数据
load data local inpath '/root/data/person.txt' into table person;
//处理结果
结果1
select name, concat(province,',',city) as city from person;
结果2
select city, collect_set(name) from (
select name, concat(province,',',city) as city from person
)as r1 group by city;
最终结果
select city, concat_ws('|',collect_set(name)) as names from (
select name, concat(province,',',city) as city from person
)as r1 group by city;
结果1
结果2
最终结果
5、窗口函数
好像给每一份数据 开一扇窗户 所以叫开窗函数
在sql中有一类函数叫做聚合函数,例如sum()、avg()、max()等等,这类函数可以将多行数据按照规则聚集为一行,一般来讲聚集后的行数是要少于聚集前的行数的.但是有时我们想要既显示聚集前的数据,又要显示聚集后的数据,这时我们便引入了窗口函数。
over()
:指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化
5.1、常用窗口函数
- row_number:增加一列无并列排名的序列号
用法: select xxxx, row_number() over(partition by 分组字段 order by 排序字段 desc) as rn from tb group by xxxx - dense_rank:有并列排名,并且依次递增
- rank:有并列排名,不依次递增
- percent_rank:(rank的结果-1)/(分区内数据的个数-1)
- cume_dist:计算某个窗口或分区中某个值的累积分布。
假定升序排序,则使用以下公式确定累积分布: 小于等于当前值x的行数 / 窗口或partition分区内的总行数。其中,x 等于 order by 子句中指定的列的当前行中的值。 - NTILE(n):对分区内数据再分成n组,然后打上组号
- max、min、avg、count、sum:基于每个partition分区内的数据做对应的计算
- LAG(col,n,default_val):往前第 n 行数据
- LEAD(col,n, default_val):往后第 n 行数据
- FIRST_VALUE:取分组内排序后,截止到当前行,第一个值
- LAST_VALUE:取分组内排序后,截止到当前行,最后一个值,对于并列的排名,取最后一个
测试数据
id,score,clazz,department
111,69,class1,department1
112,80,class1,department1
113,74,class1,department1
114,94,class1,department1
115,93,class1,department1
121,74,class2,department1
122,86,class2,department1
123,78,class2,department1
124,70,class2,department1
211,93,class1,department2
212,83,class1,department2
213,94,class1,department2
214,94,class1,department2
215,82,class1,department2
216,74,class1,department2
221,99,class2,department2
222,78,class2,department2
223,74,class2,department2
224,80,class2,department2
225,85,class2,department2
建表语句
create table new_score(
id int
,score int
,clazz string
,department string
) row format delimited fields terminated by ",";
//加载数据
load data local inpath '/root/data/new_score.txt' into table new_score;
5.2、测试row_number()、rank()等函数
测试row_number()
、dense_rank()
、rank()
、percent_rank()
按照班级分区、分数降序排序
select
id
,score
,clazz
,department
,row_number() over(partition by clazz order by score desc) as rn
,dense_rank() over(partition by clazz order by score desc) as dr
,rank() over (partition by clazz order by score desc) as rk
,percent_rank() over (partition by clazz order by score desc) as percent_rk
from new_score;
5.3、测试lag、lead等函数
测试lag()
、lead()
、first_value()
、last_value()
、ntile()
按照班级分区、分数降序排序
select id
,score
,clazz
,department
,lag(id,2) over (partition by clazz order by score desc) as lag_num
,lead(id,2) over (partition by clazz order by score desc) as lead_num
,first_value(id) over (partition by clazz order by score desc) as first_v_num
,last_value(id) over (partition by clazz order by score desc) as last_v_num
,ntile(3) over (partition by clazz order by score desc) as ntile_num
from new_score;
6、window字句(窗口帧)与with字句
6.1、window字句
window字句格式:
window w as (partition by 分区字段 order by 排序字段 rows|range between 起始位置 and 结束位置)
window开窗大小解释:
- CURRENT ROW:当前行
- n PRECEDING:往前 n 行数据
- n FOLLOWING:往后 n 行数据
- UNBOUNDED:起点,
UNBOUNDED PRECEDING 表示前面的起点,
UNBOUNDED FOLLOWING 表示到后面的终点
Hive 提供了两种定义窗口帧的形式:ROWS
和 RANGE
。两种类型都需要配置上界和下界。
-
例:
ROWS
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
表示选择分区起始记录到当前记录的所有行; -
例:
RANGE
SUM(close) RANGE BETWEEN 100 PRECEDING AND 200 FOLLOWING
则通过字段差值来进行选择。
如当前行的close
字段值是200
,那么这个窗口帧的定义就会选择分区中close
字段值落在100
至400
区间的记录。
如果没有定义窗口帧,则默认为 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
。
以下是所有可能的窗口帧定义组合。
(ROWS | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
(ROWS | RANGE) BETWEEN CURRENT ROW AND (CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
(ROWS | RANGE) BETWEEN [num] FOLLOWING AND (UNBOUNDED | [num]) FOLLOWING
range between 3 PRECEDING and 11 FOLLOWING
注意:只能运用在max、min、avg、count、sum、FIRST_VALUE、LAST_VALUE这几个窗口函数上
6.2、window字句使用
测试sql,并指定开窗大小,rows between 2 PRECEDING and 2 FOLLOWING
即当前行的前两行到当前行的后两行,一共有五行
SELECT id
,score
,clazz
,SUM(score) OVER w as sum_w
,round(avg(score) OVER w,3) as avg_w
,count(score) OVER w as cnt_w
FROM new_score
window w AS (PARTITION BY clazz ORDER BY score rows between 2 PRECEDING and 2 FOLLOWING);
解释如下图
6.3、with字句
with格式
with 表名1 as (查询语句1),
表名2 as (查询语句2)
select * from 表名2;
with字句就是将查询结果定义成一个临时表,提供给下个查询使用。
7、自定义函数
7.1、UDF函数:一进一出
- 创建maven项目,添加以下依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
- 编写代码,继承org.apache.hadoop.hive.ql.exec.UDF,实现evaluate方法,在evaluate方法中实现自己的逻辑
import org.apache.hadoop.hive.ql.exec.UDF;
public class HiveUDF extends UDF {
// hadoop => #hadoop$
public String evaluate(String col1) {
// 给传进来的数据 左边加上 # 号 右边加上 $
String result = "#" + col1 + "$";
return result;
}
}
- 打成jar包并上传至Linux虚拟机
- 在hive shell中,使用
add jar 路径
将jar包作为资源添加到hive环境中add jar /usr/local/soft/jars/HiveUDF2-1.0.jar;
- 使用jar包资源注册一个临时函数,fxxx1是你的函数名,'MyUDF’是主类名
create temporary function fxxx1 as 'MyUDF';
- 使用函数名处理数据
select fxx1(name) as fxx_name from students limit 10; //结果 #施笑槐$ #吕金鹏$ #单乐蕊$ #葛德曜$ #宣谷芹$ #边昂雄$ #尚孤风$ #符半双$ #沈德昌$ #羿彦昌$
7.2、UDTF函数:一进多出
例:
原数据
"key1:value1,key2:value2,key3:value3"
目标结果
key1 value1
key2 value2
key3 value3
- 方法1:使用 explode+split
with r1 as (select explode(split('key1:value1,key2:value2,key3:value3',',')) as kv)
select split(kv,':')[0] as k,split(kv,':')[1] as v from r1;
- 方法2:自定UDTF
代码:
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
public class HiveUDTF extends GenericUDTF {
// 指定输出的列名 及 类型
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
ArrayList<String> filedNames = new ArrayList<String>();
ArrayList<ObjectInspector> filedObj = new ArrayList<ObjectInspector>();
filedNames.add("col1");
filedObj.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
filedNames.add("col2");
filedObj.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(filedNames, filedObj);
}
// 处理逻辑 my_udtf(col1,col2,col3)
// "key1:value1,key2:value2,key3:value3"
// my_udtf("key1:value1,key2:value2,key3:value3")
public void process(Object[] objects) throws HiveException {
// objects 表示传入的N列
String col = objects[0].toString();
// key1:value1 key2:value2 key3:value3
String[] splits = col.split(",");
for (String str : splits) {
String[] cols = str.split(":");
// 将数据输出
forward(cols);
}
}
// 在UDTF结束时调用
public void close() throws HiveException {
}
}
首先将上述代码打成jar包,并上传至linux
在hive shell中,使用 add jar 路径将jar包作为资源添加到hive环境中
add jar /usr/local/soft/jars/HiveUDTF-1.0.jar;
使用jar包资源注册一个临时函数,my_udtf是你的函数名,'HiveUDTF'是主类名
create temporary function my_udtf as 'HiveUDTF';
处理数据
select my_udtf("key1:value1,key2:value2,key3:value3");
8、例题:连续登陆问题
在电商、物流和银行可能经常会遇到这样的需求:统计用户连续交易的总额、连续登陆天数、连续登陆开始和结束时间、间隔天数等。
8.1、数据:
注意:每个用户每天可能会有多条记录
id datestr amount
1,2019-02-08,6214.23
1,2019-02-08,6247.32
1,2019-02-09,85.63
1,2019-02-09,967.36
1,2019-02-10,85.69
1,2019-02-12,769.85
1,2019-02-13,943.86
1,2019-02-14,538.42
1,2019-02-15,369.76
1,2019-02-16,369.76
1,2019-02-18,795.15
1,2019-02-19,715.65
1,2019-02-21,537.71
2,2019-02-08,6214.23
2,2019-02-08,6247.32
2,2019-02-09,85.63
2,2019-02-09,967.36
2,2019-02-10,85.69
2,2019-02-12,769.85
2,2019-02-13,943.86
2,2019-02-14,943.18
2,2019-02-15,369.76
2,2019-02-18,795.15
2,2019-02-19,715.65
2,2019-02-21,537.71
3,2019-02-08,6214.23
3,2019-02-08,6247.32
3,2019-02-09,85.63
3,2019-02-09,967.36
3,2019-02-10,85.69
3,2019-02-12,769.85
3,2019-02-13,943.86
3,2019-02-14,276.81
3,2019-02-15,369.76
3,2019-02-16,369.76
3,2019-02-18,795.15
3,2019-02-19,715.65
3,2019-02-21,537.71
建表语句:
create table deal_tb(
id string
,datestr string
,amount string
)row format delimited fields terminated by ',';
8.2、计算思路
-
先按用户和日期分组求和,使每个用户每天只有一条数据
select id ,datestr ,sum(amount) as sum_amount from deal_tb group by id,datestr;
-
根据用户ID分组按日期排序,将日期和分组序号相减得到连续登陆的开始日期,如果开始日期相同说明连续登陆
with t1 as (select id ,datestr ,sum(amount) as sum_amount from deal_tb group by id,datestr), t2 as (select t1.id ,t1.datestr ,t1.sum_amount ,row_number() over(partition by id order by datestr) as rn from t1) select t2.id ,t2.datestr ,t2.sum_amount ,date_sub(t2.datestr,rn) as grp from t2;
-
统计用户连续交易的总额、连续登陆天数、连续登陆开始和结束时间、间隔天数
with t1 as (select id ,datestr ,sum(amount) as sum_amount from deal_tb group by id,datestr), t2 as (select t1.id ,t1.datestr ,t1.sum_amount ,row_number() over(partition by id order by datestr) as rn from t1), t3 as (select t2.id ,t2.datestr ,t2.sum_amount ,date_sub(t2.datestr,rn) as grp from t2) select t3.id ,t3.grp ,round(sum(t3.sum_amount),2) as sc_sum_amount ,count(1) as sc_days ,min(t3.datestr) as sc_start_date ,max(t3.datestr) as sc_end_date ,datediff(t3.grp,lag(t3.grp,1) over(partition by t3.id order by t3.grp)) as iv_days from t3 group by t3.id,t3.grp;