【项目】数仓项目(七)

【项目】数仓项目(七)【项目】数仓项目(七)

(图片来源于网络,侵删)


一、数仓搭建 - DWD 层

  • 1)对用户行为数据解析
  • 2)对核心数据进行判空过滤
  • 3)对业务数据采用维度模型重新建模,即维度退化

1.1 DWD 层(用户行为启动表数据解析)

【项目】数仓项目(七)

1.1.1 创建启动表

1)建表语句

drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`open_ad_type` string,
`action` string,
`loading_time` string,
`detail` string,
`extend1` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_start_log/'
TBLPROPERTIES('parquet.compression'='lzo');

说明:数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引

1.1.2 get_json_object 函数使用

1)输入数据 xjson

Xjson=[{"name":" 大 郎 ","sex":" 男 ","age":"25"},{"name":" 西 门 庆 ","sex":" 男","age":"47"}]

2)取出第一个 json 对象

SELECT get_json_object(xjson,"$.[0]") FROM person;

结果是:{“name”:“大郎”,“sex”:“男”,“age”:“25”}

3)取出第一个 json 的 age 字段的值

SELECT get_json_object(xjson,"$.[0].age") FROM person;

结果是:25

1.1.3 向启动表导入数据
insert overwrite table dwd_start_log
PARTITION (dt='2020-03-10')
select
get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from ods_start_log
where dt='2020-03-10';

3)测试

select * from dwd_start_log where dt='2020-03-10' limit 2;
1.1.4 DWD 层启动表加载数据脚本

1)vim ods_to_dwd_log.sh

在脚本中编写如下内容

#!/bin/bash
# 定义变量方便修改
APP=gmall
hive=/opt/modules/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table "$APP".dwd_start_log
PARTITION (dt='$do_date')
select
get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from "$APP".ods_start_log
where dt='$do_date';
"
$hive -e "$sql"

2)增加脚本执行权限

chmod 770 ods_to_dwd_log.sh

3)脚本使用

ods_to_dwd_log.sh 2020-03-11

4)查询导入结果

select * from dwd_start_log where dt='2020-03-11' limit 2;

1.2 DWD 层(用户行为事件表数据解析)

【项目】数仓项目(七)
【项目】数仓项目(七)

1.2.1 创建基础明细表

明细表用于存储 ODS 层原始表转换过来的明细数据

【项目】数仓项目(七)
1)创建事件日志基础明细表

drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log(
  `mid_id` string,
  `user_id` string,
  `version_code` string,
  `version_name` string,
  `lang` string,
  `source` string,
  `os` string,
  `area` string,
  `model` string,
  `brand` string,
  `sdk_version` string,
  `gmail` string,
  `height_width` string,
  `app_time` string,
  `network` string,
  `lng` string,
  `lat` string,
  `event_name` string,
  `event_json` string,
  `server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)说明:其中 event_name 和 event_json 用来对应事件名和整个事件。这个地方将原始日志1 对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF

1.2.2 自定义 UDF 函数(解析公共字段)

UDF 函数特点:一行进一行出。简称,一进一出
【项目】数仓项目(七)
1)创建一个 maven 工程:hivefunction

2)创建包名:com.zsy.udf

3)在 pom.xml 文件中添加如下内容

<properties>
	<hive.version>2.3.0</hive.version>
</properties>

<repositories>
	<repository>
		<id>spring-plugin</id>
		<url>https://repo.spring.io/plugins-release/</url>
	</repository>
</repositories>

<dependencies>
	<!--添加 hive 依赖-->
	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-exec</artifactId>
		<version>${hive.version}</version>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>2.3.2</version>
			<configuration>
				<source>1.8</source>
				<target>1.8</target>
			</configuration>
		</plugin>
		<plugin>
			<artifactId>maven-assembly-plugin</artifactId>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
						<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

注意 1:如果 hive 的 jar 包下载失败,可以将如下参数配置添加到 idea 中

-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true
-Dmaven.wagon.http.ssl.ignore.validity.dates=true

【项目】数仓项目(七)
详情请点击博客

上一篇:聚会项目笔记梳理


下一篇:【Spring学习笔记-4】注入集合类List、Set、Map、Pros等