使用Hive进行OSS数据处理的一个最佳实践

本文主要介绍如何使用Hive来处理保存在OSS上的数据源,并通过E-MapReduce计算,最终的结果保存在OSS上,并能够每天自动的进行Hive的分区数据的调度

处理条件:

数据源:我们假设在OSS上我们的数据是按照一定的目录格式来保存的,比如时间,按照类似2016/06/01这样的年/月/日的方式存放。而原始数据内容都是一些非格式化的数据,完全没有经过处理。
类似如下的一个格式:

123|service control exceed 100. others content|192.168.0.1|2016-05-31

结果数据:我们需要把每个目录下的数据经过处理,写到OSS上类似2016/06/01的一个结果目录下

处理过程:

创建元数据表

CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';

通过这一步,我们有了一张Hive的分区表,Hive只是在它的元数据库中记录了这个表的信息,这个时候还没有数据的处理。而数据也还在我们的OSS上躺着。

接着把需要的分区都加入到表中,这里我假设我们有很多个分区

ALTER TABLE logoss ADD PARTITION (year='2016', month='05', day='31') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/05/31' PARTITION (year='2016', month='06', day='01') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/01' PARTITION (year='2016', month='06', day='02') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/02' PARTITION (year='2016', month='06', day='03') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/03';

接下来我们select数据看一下,执行如下

select * from logoss limit 100;

我们就会看到我们的分区中的内容了。

处理原始数据

我们要把原来OSS上的原始数据,经过处理然后写到一个HDFS上的表,然后用这个HDFS的表进行后续的一系列处理。这里把所有的中间步骤都在HDFS上走,这样速度会快很多。

首先建立一个基于HDFS的Hive表,目前数据也还是空

CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;

然后将OSS的数据进行处理并写入到HDFS的表中,这里我们使用IF NOT EXISTS,为了防止这个分区已经存在被我们覆盖掉,如果你希望数据直接覆盖,可以去掉这个条件判断。

INSERT OVERWRITE TABLE loghdfs PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;

业务处理

好了,到了这一步,我们就已经有了一个hdfs上的表了,我们可以对这个表进行任意的后续处理,
比如groupby 所有的ip,然后看他们的总数值

CREATE TABLE userip as select ip, count(id) from loghdfs group by ip;

中间可以进行类似的各种操作,由你的业务决定。
当所有的操作都完成以后,如果要把数据写到OSS上,那么来到最后一步

写回OSS

首先我们会创建一个对应OSS路径的Hive表,与第一步很类似

CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';

最后把我们的业务数据写入到对应的分区中去

INSERT OVERWRITE TABLE resultoss PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select ip, count FROM userip;

这样我们的结果数据就写到了OSS上对应的目录下,类似这样的路径

/path/year=2016/month=05/day=31/

如何自动化

看了上面的这个过程,会发现这中间这个时间的分区需要我们手工写在里面,实在是太麻烦了,完全没有办法自动跑啊,那么下面我们就来更加进化一下。

job上配置自动时间

我们首先在E-MapReduce控制台上编辑的时候使用hivevar来指定时间变量,如下

-hivevar year='2016' -hivevar month='05' -hivevar day='31' -f ossref://mypath/job.hql

然后,我们需要把这个里面的常量变成每天自动变化的时间,我们使用E-MapReduce提供的时间变量
如下

-hivevar year=' ${yyyy-1d}' -hivevar month=' ${MM-1d}' -hivevar day=' ${dd-1d}' -f ossref://mypath/job.hql

时间配置的说明请参考这里

完整的作业配置及代码

使用Hive进行OSS数据处理的一个最佳实践

现在我们看看修改完成以后的完整的代码,中间的分区时间都是用变量进行了替换

CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/';

ALTER TABLE logoss ADD PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/${hivevar:year}/${hivevar:month}/${hivevar:day}';

CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;

INSERT OVERWRITE TABLE loghdfs PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;

CREATE TABLE userip as select ip, count(id) as count from loghdfs group by ip;

CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/outpath/';

INSERT OVERWRITE TABLE resultoss PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select ip, count FROM userip;

然后你可以把这个作业加到一个周期执行的执行计划中,每天运行一次,就可以完全的自动每天跑数据啦。

上一篇:通过可视化更好的了解你的Spark应用


下一篇:E-MapReduce 数据湖 Meetup 8.7上海站延期