有效线索主题看板
1. 学习目标
了解有效线索主题看板需求
了解Hive索引的用法
掌握Row Group Index的用法
掌握Bloom Filter Index的用法
能够采集有效线索全量数据
能够使用Hive进行并行操作
掌握Hive常用的判断函数
能够编写有效线索指标的DWD清洗转换SQL
能够编写有效线索指标的DWM中间层SQL
能够编写有效线索指标的DWS业务层SQL
能够导出有效线索指标结果到Mysql
掌握增量数据分析的过程
2. 主题需求
2.1 有效线索转化率
说明:统计期内,访客咨询产生的有效线索的占比。有效线索量 / 咨询量,有效线索指的是拿到电话且电话有效。
展现:线状图。双轴:有效线索量、有效线索转化率。
条件:年、月、线上线下
维度:年、月、线上线下
指标:访客咨询率=有效线索量/咨询量
粒度:天
数据来源:客户管理系统的customer_clue线索表、customer_relationship意向表、customer_appeal申诉表;咨询系统的web_chat_ems访问咨询表
SQL:
- --咨询量(暂时以2019年7月的数据为例):
-
SELECT
- count(1)
-
FROM
- web_chat_ems_2019_07
-
WHERE
- msg_count >= 1
- AND create_time >= '2019-07-01'
- AND create_time <= '2019-07-15 23:59:59';
- --有效线索量:
-
SELECT
- count(1)
-
FROM
-
customer_clue cc
- LEFT JOIN customer_relationship cr ON cc.customer_relationship_id = cr.id
-
WHERE
- cc.clue_state IN (
- 'VALID_NEW_CLUES', --新客户新线索
- 'VALID_PUBLIC_NEW_CLUE' --老客户新线索
- )
- AND cc.customer_relationship_id NOT IN (
-
SELECT
- ca.customer_relationship_first_id
-
FROM --投诉表,投诉成功的数据为无效线索
- customer_appeal ca
-
WHERE
- ca.appeal_status = 1 AND ca.customer_relationship_first_id != 0
- )
- AND cr.origin_type IN ('NETSERVICE','PRESIGNUP') --线上(排除挖掘录入量)
- AND ! cc.deleted
- AND cc.create_date_time <= '2019-07-01'
- AND cc.create_date_time <= '2019-07-15 23:59:59';
2.2 有效线索转化率时间段趋势
说明:统计期内,1-24h之间,每个时间段的有效线索转化率。横轴:1-24h,间隔为1h,纵轴:每个时间段的有效线索转化率。
展现:线状图
条件:天、线上线下
维度:天、线上线下
指标:某小时的总有效线索转化率
粒度:区间内小时段(区间内同一个时间点的总有效线索转化率)
数据来源:客户管理系统的customer_clue线索表、customer_relationship意向表、customer_appeal申诉表;咨询系统的web_chat_ems访问咨询表
SQL:同上
2.3 有效线索量
说明:统计期内,新增的咨询客户中,有效线索的数量。
展现:线状图。
条件:年、月、线上线下
维度:年、月、线上线下
指标:有效线索的数量
粒度:天
数据来源:客户管理系统的customer_clue线索表、customer_relationship意向表、customer_appeal申诉表
SQL:同上
2.4 原始数据结构
有效线索指标的原始数据为客户管理系统的customer_clue线索表和customer_relationship意向客户表。
customer_clue是线索事实表,customer_relationship表主要是用来判断数据来源为线上还是线下。
这两张表在意向客户指标的ODS层已经抽取过,此处可以直接复用。
customer_appeal表是线索申诉表,主要用来判断客户线索被投诉无效。
测试数据:已包含在意向客户主题测试sql中,无需重复导入。
CREATE TABLE `customer_appeal` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `customer_relationship_first_id` int(11) NOT NULL COMMENT '第一条客户关系id', `employee_id` int(11) DEFAULT NULL COMMENT '申诉人', `employee_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '申诉人姓名', `employee_department_id` int(11) DEFAULT NULL COMMENT '申诉人部门', `employee_tdepart_id` int(11) DEFAULT NULL COMMENT '申诉人所属部门', `appeal_status` int(1) NOT NULL COMMENT '申诉状态,0:待稽核 1:无效 2:有效', `audit_id` int(11) DEFAULT NULL COMMENT '稽核人id', `audit_name` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '稽核人姓名', `audit_department_id` int(11) DEFAULT NULL COMMENT '稽核人所在部门', `audit_department_name` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '稽核人部门名称', `audit_date_time` datetime DEFAULT NULL COMMENT '稽核时间', `create_date_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间(申诉时间)', `update_date_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '删除标志位', `tenant` int(11) NOT NULL DEFAULT '0', PRIMARY KEY (`id`), KEY `id` (`id`,`appeal_status`) USING BTREE, KEY `idx_customer_relationship_first_id` (`customer_relationship_first_id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=2012358 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
|
3. 建模分析
3.1 指标和维度
根据主题需求,我们来进行指标和维度的提取:
有效线索转化率,公式:有效线索量/咨询量。其中分母咨询量,在我们之前的指标中已经统计过,在这里可以直接使用。因此只需要统计分子有效线索量即可。有效线索指的是拿到电话且电话有效。维度虽然只有年、月、线上线下,但是因为数据粒度具体到天,所以维度统计时也要增加天。维度:年、月、天、线上线下。
有效线索转化率时间段趋势,主要是针对小时段区间内的数据进行统计,跨天数据无需去重。因此维度统计时需要统计到小时维度。维度:天、小时、线上线下。
有效线索量,即有效线索转化率的分子,数据粒度为天。但此处特别指明是新增客户的有效线索,因此统计维度中需要包含此线索所属的客户是新客户还是老客户。维度:年、月、天、线上线下、新旧客户。
总结:
l 指标:有效线索量;
l 维度:年、月、天、小时、线上线下、新旧客户。
3.2 分层设计
- 最终需要统计的数据维度:年、月、天、小时、线上线下、新旧客户;
- 将维度分为三类:时间维度(年、月、天、小时)、数据来源(线上线下)和新旧客户类型;
- ODS层原始数据包括:主表有效线索、意向客户表(用来判断是新客户还是老客户);
- 有效线索数据同样的数据只能录入一次,因此不存在去重的问题,所以可以使用DWM中间层来进行维度关联,并做少量聚合,可被DWS层调用以提高计算速度;
- DWS层在DWM层的基础上进行统计,得出数据集市;
- 因为使用的是帆软BI自定义可视化展现,所以不再提供细分的APP层,直接将DWS数据集市导出到OLAP应用的mysql中即可。
4. 实现
4.1 建模
4.1.1 指标和维度
l指标:有效线索量;
l维度:
l 时间维度:年、月、天、小时
l 数据来源:线上、线下
l 客户类型:新客户线索、老客户线索
4.1.2 事实表和维度表
事实表:customer_clue线索表
维表:
- customer_relationship意向客户表,主要为了判断数据来源为线上还是线下,也是意向客户指标的事实表。
- customer_appeal线索申诉表,主要为了判断线索数据是否有效。
4.1.3 Hive索引
Hive支持索引,但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。
Hive索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少MapReduce任务中需要读取的数据块的数量。
在可以预见到分区数据非常庞大的情况下,分桶和索引常常是优于分区的。而分桶由于SMB Join对关联键要求严格,所以并不是总能生效。
4.1.3.1 Hive原始索引
Hive的索引目的是提高Hive表指定列的查询速度。
没有索引时,类似'WHERE tab1.col1 = 10' 的查询,Hive会加载整张表或分区,然后处理所有的rows,但是如果在字段col1上面存在索引时,那么只会加载和处理文件的一部分。
在每次建立、更新数据后,Hive索引不会自动更新,需要手动进行更新(重建索引以构建索引表),会触发一个mr job。
Hive索引使用过程繁杂,而且性能一般,在Hive3.0中已被删除,在工作环境中不推荐优先使用,在分区数量过多或查询字段不是分区字段时,索引可以作为补充方案同时使用。推荐使用ORC文件格式的索引类型进行查询。
4.1.3.2 Row Group Index
一个ORC文件包含一个或多个stripes(groups of row data),每个stripe中包含了每个column的min/max值的索引数据,当查询中有<,>,=的操作时,会根据min/max值,跳过扫描不包含的stripes。
而其中为每个stripe建立的包含min/max值的索引,就称为Row Group Index行组索引,也叫min-max Index大小对比索引,或者Storage Index。
在建立ORC格式表时,指定表参数’orc.create.index’=’true’之后,便会建立Row Group Index,需要注意的是,为了使Row Group Index有效利用,向表中加载数据时,必须对需要使用索引的字段进行排序,否则,min/max会失去意义。另外,这种索引主要用于数值型字段的查询过滤优化上。
设置hive.optimize.index.filter为true,并重启hive
创建表/插入数据:
CREATE TABLE lxw1234_orc2 stored AS ORC TBLPROPERTIES ( 'orc.compress'='SNAPPY', -- 开启行组索引 'orc.create.index'='true' ) AS SELECT CAST(siteid AS INT) AS id, pcid FROM lxw1234_text -- 插入的数据保持排序 DISTRIBUTE BY id sort BY id;
|
查询:
set hive.optimize.index.filter=true; SELECT COUNT(1) FROM lxw1234_orc1 WHERE id >= 1382 AND id <= 1399;
|
4.1.3.3 Bloom Filter Index
在建表时候,通过表参数”orc.bloom.filter.columns”=”pcid”来指定为那些字段建立BloomFilter索引,这样,在生成数据的时候,会在每个stripe中,为该字段建立BloomFilter的数据结构,当查询条件中包含对该字段的=号过滤时候,先从BloomFilter中获取以下是否包含该值,如果不包含,则跳过该stripe。
创建:
CREATE TABLE lxw1234_orc2 stored AS ORC TBLPROPERTIES ( 'orc.compress'='SNAPPY', 'orc.create.index'='true', -- pcid字段开启BloomFilter索引 "orc.bloom.filter.columns"="pcid" ) AS SELECT CAST(siteid AS INT) AS id, pcid FROM lxw1234_text DISTRIBUTE BY id sort BY id;
|
查询:
SET hive.optimize.index.filter=true; SELECT COUNT(1) FROM lxw1234_orc1 WHERE id >= 0 AND id <= 1000 AND pcid IN ('0005E26F0DCCDB56F9041C','A');
|
只有在数据量较大时,使用索引才能带来性能优势。
4.1.4 分层
ODS层可以复用意向客户指标,无需重复创建。
因为线索数据不会重复(不用distinct),所以可以采用DWM中间层过度。
4.1.4.1 ODS
customer_clue线索表和customer_relationship意向客户表复用意向客户指标的ODS层。
写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
|
4.1.4.1.1 customer_appeal线索申诉表
分桶表,sqoop抽取数据是不支持的,但是索引表是支持的。
CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.`customer_appeal` ( `id` int COMMENT 'customer_appeal_id', `customer_relationship_first_id` int COMMENT '第一条客户关系id', `employee_id` int COMMENT '申诉人', `employee_name` STRING COMMENT '申诉人姓名', `employee_department_id` int COMMENT '申诉人部门', `employee_tdepart_id` int COMMENT '申诉人所属部门', `appeal_status` int COMMENT '申诉状态,0:待稽核 1:无效 2:有效', `audit_id` int COMMENT '稽核人id', `audit_name` STRING COMMENT '稽核人姓名', `audit_department_id` int COMMENT '稽核人所在部门', `audit_department_name` STRING COMMENT '稽核人部门名称', `audit_date_time` STRING COMMENT '稽核时间', `create_date_time` STRING COMMENT '创建时间(申诉时间)', `update_date_time` STRING COMMENT '更新时间', `deleted` STRING COMMENT '删除标志位', `tenant` int COMMENT '租户id') comment '客户申诉表' PARTITIONED BY(start_time STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orc TBLPROPERTIES ('orc.compress'='SNAPPY','orc.create.index'='true','orc.bloom.filter.columns'='appeal_status,customer_relationship_first_id');
|
4.1.4.2 DWD
过滤已删除数据(deleted)、线索状态clue_state为空的数据;
此处回顾SMB Join的用法:关联意向表的customer_relationship_id如果为空则转换为-1。
customer_clue线索表、customer_relationship意向表可以获取并转换得到新老客户、线上线下。在意向客户主题已经建立过这两个ODS表,类似的关联查询可以通过分桶提高查询效率,我们在这两个字段上分别建立分桶,而且要保证他们的排序和分桶的数量是一致的。
CREATE TABLE IF NOT EXISTS itcast_dwd.itcast_clue_dwd ( `id` STRING COMMENT '线索id', `customer_relationship_id` int COMMENT '客户关系id', `origin_type_stat` STRING COMMENT '数据来源:0.线下;1.线上', `for_new_user` STRING COMMENT '0:未知;1:新客户线索;2:旧客户线索', `deleted` STRING COMMENT '是否删除', `create_date_time` BIGINT COMMENT '创建时间', `hourinfo` STRING COMMENT '小时信息' ) comment '客户申诉dwd表'
PARTITIONED BY(yearinfo STRING,monthinfo STRING,dayinfo STRING)
clustered by(customer_relationship_id) sorted by(customer_relationship_id) into 10 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orcfile TBLPROPERTIES ('orc.compress'='SNAPPY','orc.create.index'='true','orc.bloom.filter.columns'='customer_relationship_id');
|
4.1.4.3 DWM
DWM层过滤投诉数据,并以小时进行统计。
分区并不是越多越好,统计后数据量变小,可以年作为分区。
CREATE TABLE IF NOT EXISTS itcast_dwm.itcast_clue_dwm ( `clue_nums` STRING COMMENT '根据id聚合', `origin_type_stat` STRING COMMENT '数据来源:0.线下;1.线上', `for_new_user` STRING COMMENT '0:未知;1:新客户线索;2:旧客户线索', `hourinfo` STRING COMMENT '小时信息', `dayinfo`STRING COMMENT '天信息', `monthinfo` STRING COMMENT '月信息' ) comment '客户申诉dwm表' PARTITIONED BY(yearinfo STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orcfile TBLPROPERTIES ('orc.compress'='SNAPPY');
|
4.1.4.4 DWS
针对不同的时间维度进行统计,方便OLAP系统使用。
CREATE TABLE IF NOT EXISTS itcast_dws.itcast_clue_dws ( `clue_nums` INT COMMENT '根据id聚合', `origin_type_stat` STRING COMMENT '数据来源:0.线下;1.线上', `for_new_user` STRING COMMENT '0:未知;1:新客户线索;2:旧客户线索', `hourinfo` STRING COMMENT '小时信息', `dayinfo`STRING COMMENT '天信息', `monthinfo` STRING COMMENT '月信息', `time_type` STRING COMMENT '聚合时间类型:1、按小时聚合;2、按天聚合;3、按周聚合;4、按月聚合;5、按年聚合;', `time_str` STRING COMMENT '时间明细' ) comment '客户申诉app表' PARTITIONED BY(yearinfo STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orcfile TBLPROPERTIES ('orc.compress'='SNAPPY');
|
4.1.4.5 APP
如果用户需要具体的报表展示,可以针对不同的报表页面设计APP层结构,然后导出至OLAP系统的mysql中。此系统使用FineReport,需要通过宽表来进行灵活的展现。因此APP层不再进行细化。直接将DWS层导出至mysql即可。
4.2 全量流程
4.2.1 数据采集
4.2.1.1 Customer_clue线索表、customer_relationship表
ODS复用意向客户指标,不需要重复采集数据。
4.2.1.2 customer_appeal表
SQL:
select `id`,
`customer_relationship_first_id`,
`employee_id`,
`employee_name`,
`employee_department_id`,
`employee_tdepart_id`,
`appeal_status`,
`audit_id`,
`audit_name`,
`audit_department_id`,
`audit_department_name`,
`audit_date_time`,
`create_date_time`,
`update_date_time`,
`deleted`,
`tenant`,
DATE_SUB(curdate(),INTERVAL 1 DAY) as start_time
from customer_appeal
|
Sqoop:
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/scrm \
--username root \
--password 123456 \
--query 'select `id`,`customer_relationship_first_id`,`employee_id`,`employee_name`,`employee_department_id`,`employee_tdepart_id`,`appeal_status`,`audit_id`,`audit_name`,`audit_department_id`,`audit_department_name`,`audit_date_time`,`create_date_time`,`update_date_time`,`deleted`,`tenant`,DATE_SUB(curdate(),INTERVAL 1 DAY) as start_time from customer_appeal where $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table customer_appeal \
-m 100 \
--split-by id
|
4.2.2 数据清洗转换
4.2.2.1 分析
一、清洗:
过滤已删除数据(deleted)、线索状态clue_state为空的数据;
二、转换:
关联意向表的customer_relationship_id字段如果为空则转换为-1。
customer_clue线索表、customer_relationship意向表可以获取并转换得到新老客户、线上线下。
customer_clue线索表获取clue_state信息,将clue_state状态转换为新老客户:如果clue_state状态为VALID_NEW_CLUES,则为新客户,为VALID_PUBLIC_NEW_CLUE,则为老客户,否则为无效数据。
此处回顾SMB Join的用法,customer_clue线索表的customer_relationship_id字段与customer_relationship表的id字段进行关联。customer_relationship意向客户主表,将origin_type来源渠道字段转换为线上/线下:如果origin_type是NETSERVICE和PRESIGNUP类型,即为1线上,否则为0线下。
在意向客户主题已经建立过这两个ODS表,类似的关联查询可以通过分桶提高查询效率,我们在这两个字段上分别建立分桶,而且要保证他们的排序和分桶的数量是一致的。
类似的关联查询可以通过分桶提高查询效率,我们在这两个字段上分别建立分桶,而且要保证他们的排序和分桶的数量是一致的。在意向客户主题已经建立过这两个ODS表。
4.2.2.2 代码
--分区 SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; --hive压缩 set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; --写入时压缩生效 set hive.exec.orc.compression.strategy=COMPRESSION; --分桶 set hive.enforce.bucketing=true; set hive.enforce.sorting=true; set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; --并行执行 set hive.exec.parallel=true; set hive.exec.parallel.thread.number=8;
INSERT INTO itcast_dwd.itcast_clue_dwd PARTITION (yearinfo, monthinfo, dayinfo) SELECT clue.id, nvl(clue.customer_relationship_id, -1) customer_relationship_id, if(rs.origin_type='NETSERVICE' or rs.origin_type='PRESIGNUP', '1', '0') as origin_type_stat, if(clue.clue_state='VALID_NEW_CLUES', 1, if(clue.clue_state='VALID_PUBLIC_NEW_CLUE', 2, 0)) as for_new_user, clue.deleted, clue.create_date_time, substr(clue.create_date_time, 12, 2) as hourinfo, substr(clue.create_date_time, 1, 4) as yearinfo, substr(clue.create_date_time, 6, 2) as monthinfo, substr(clue.create_date_time, 9, 2) as dayinfo FROM itcast_ods.customer_clue TABLESAMPLE(BUCKET 1 OUT OF 10 ON customer_relationship_id) clue LEFT JOIN itcast_ods.customer_relationship rs on clue.customer_relationship_id = rs.id where clue.clue_state is not null AND clue.deleted = 'false';
|
4.2.2.3 测试
customer_relationship可以采用分桶采样的方式进行测试,以提升执行效率。注意tablesample关键字所在的位置。
通过执行计划,可以看到分桶后的Join查询,使用了SMB Join进行优化。
INSERT INTO itcast_dwd.itcast_clue_dwd PARTITION (yearinfo, monthinfo, dayinfo) SELECT …… FROM itcast_ods.customer_clue TABLESAMPLE(BUCKET 1 OUT OF 10 ON customer_relationship_id) clue LEFT JOIN itcast_ods.customer_relationship rs on clue.customer_relationship_id = rs.id where clue.clue_state is not null AND clue.deleted = 'false';
|
4.2.3 统计分析
4.2.3.1 分析
在DWD层通过关联转换获取到客户线索是否有效,以及线索的来源是线上还是线下。
DWM层会在DWD层的基础之上,判断线索是否被客服投诉;
因为不涉及去重问题,此处简单的按照最细粒度维度打包进行统计,便于上层的数据集市按需取数。其中时间维度使用最小粒度的小时维度。
DWS根据需要的维度,分别从DWM获取数据后进行二次聚合,因为DWM已经打包统计过一次,数据较少,所以DWS的统计效率会比较高。
4.2.3.2 代码
4.2.3.2.1 DWM
4.2.3.2.1.1 实现
--分区 SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; --hive压缩; set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; --写入时压缩生效 set hive.exec.orc.compression.strategy=COMPRESSION; --分桶 set hive.enforce.bucketing=true; set hive.enforce.sorting=true; set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; --并行执行 set hive.exec.parallel=true; set hive.exec.parallel.thread.number=16;
INSERT into itcast_dwm.itcast_clue_dwm partition (yearinfo) SELECT count(dwd.id) as clue_nums, dwd.origin_type_stat, dwd.for_new_user, dwd.hourinfo, dwd.dayinfo, dwd.monthinfo, dwd.yearinfo from itcast_dwd.itcast_clue_dwd dwd WHERE dwd.customer_relationship_id NOT IN ( SELECT a.customer_relationship_first_id from itcast_ods.customer_appeal a WHERE a.appeal_status = 1 and a.customer_relationship_first_id != 0 ) GROUP BY dwd.yearinfo, dwd.monthinfo, dwd.dayinfo, dwd.hourinfo, dwd.origin_type_stat, dwd.for_new_user;
|
4.2.3.2.1.2 验证索引性能
创建无索引表并导入数据:
CREATE TABLE IF NOT EXISTS itcast_dwd.itcast_clue_dwd_test ( `id` STRING COMMENT '线索id', `customer_relationship_id` STRING COMMENT '客户关系id', `origin_type_stat` STRING COMMENT '数据来源:0.线下;1.线上', `for_new_user` STRING COMMENT '0:未知;1:新客户线索;2:旧客户线索', `deleted` STRING COMMENT '是否删除', `create_date_time` BIGINT COMMENT '创建时间', `hourinfo` STRING COMMENT '小时信息' ) comment '客户申诉dwd表' PARTITIONED BY(yearinfo STRING,monthinfo STRING,dayinfo STRING) clustered by(customer_relationship_id) sorted by(customer_relationship_id) into 10 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orcfile TBLPROPERTIES ('orc.compress'='SNAPPY');
CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.`customer_appeal_test` ( `id` int COMMENT 'customer_appeal_id', `customer_relationship_first_id` int COMMENT '第一条客户关系id', `employee_id` int COMMENT '申诉人', `employee_name` STRING COMMENT '申诉人姓名', `employee_department_id` int COMMENT '申诉人部门', `employee_tdepart_id` int COMMENT '申诉人所属部门', `appeal_status` int COMMENT '申诉状态,0:待稽核 1:无效 2:有效', `audit_id` int COMMENT '稽核人id', `audit_name` STRING COMMENT '稽核人姓名', `audit_department_id` int COMMENT '稽核人所在部门', `audit_department_name` STRING COMMENT '稽核人部门名称', `audit_date_time` STRING COMMENT '稽核时间', `create_date_time` STRING COMMENT '创建时间(申诉时间)', `update_date_time` STRING COMMENT '更新时间', `deleted` STRING COMMENT '删除标志位', `tenant` int COMMENT '租户id') comment '客户申诉表' PARTITIONED BY(start_time STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orc TBLPROPERTIES ('orc.compress'='SNAPPY');
INSERT INTO itcast_ods.customer_appeal_test PARTITION (start_time) SELECT * from itcast_ods.customer_appeal;
INSERT INTO itcast_dwd.itcast_clue_dwd_test PARTITION (yearinfo, monthinfo, dayinfo) SELECT clue.id, clue.customer_relationship_id, if(rs.origin_type='NETSERVICE' or rs.origin_type='PRESIGNUP', '1', '0') as origin_type_stat, if(clue.clue_state='VALID_NEW_CLUES', 1, if(clue.clue_state='VALID_PUBLIC_NEW_CLUE', 2, 0)) as for_new_user, clue.deleted, clue.create_date_time, substr(clue.create_date_time, 12, 2) as hourinfo, substr(clue.create_date_time, 1, 4) as yearinfo, substr(clue.create_date_time, 6, 2) as monthinfo, substr(clue.create_date_time, 9, 2) as dayinfo FROM itcast_ods.customer_clue TABLESAMPLE(BUCKET 1 OUT OF 10 ON customer_relationship_id) clue LEFT JOIN itcast_ods.customer_relationship rs on clue.customer_relationship_id = rs.id where clue.clue_state is not null AND clue.deleted = 'false';
explain SELECT count(dwd.id) as clue_nums, dwd.origin_type_stat, dwd.for_new_user, dwd.hourinfo, dwd.dayinfo, dwd.monthinfo, dwd.yearinfo from itcast_dwd.itcast_clue_dwd_test dwd WHERE dwd.customer_relationship_id NOT IN ( SELECT a.customer_relationship_first_id from itcast_ods.customer_appeal_test a WHERE a.appeal_status = 1 and a.customer_relationship_first_id != 0 ) GROUP BY dwd.yearinfo, dwd.monthinfo, dwd.dayinfo, dwd.hourinfo, dwd.origin_type_stat, dwd.for_new_user;
|
通过执行计划,可以明显看到,加上索引以后,查询所读取的数据大小缩小了10倍以上,数据量越大提升越大:
4.2.3.2.2 DWS
DWS根据需要的维度,分别从DWM获取数据后进行二次聚合,因为DWM已经打包统计过一次,数据较少,所以DWS的统计效率会比较高。
小时数据和DWM层的数据是一致的,可以直接拿来使用。而年月日数据,则需要group by以后执行sum求和操作。
小时:
--分区 SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; --分桶 set hive.enforce.bucketing=true; set hive.enforce.sorting=true; set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; --并行执行 set hive.exec.parallel=true; set hive.exec.parallel.thread.number=16;
--小时 INSERT INTO itcast_clue_dws PARTITION(yearinfo) SELECT clue_nums, origin_type_stat, for_new_user, hourinfo, dayinfo, monthinfo, '1' as time_type, concat(dwm.yearinfo,'-',dwm.monthinfo,'-',dwm.dayinfo,' ', dwm.hourinfo) as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm;
|
年月日数据:
--天 INSERT INTO itcast_clue_dws PARTITION(yearinfo) SELECT sum(clue_nums) as clue_nums, origin_type_stat, for_new_user, '-1' as hourinfo, dayinfo, monthinfo, '2' as time_type, concat(dwm.yearinfo,'-',dwm.monthinfo,'-',dwm.dayinfo) as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm GROUP BY dwm.yearinfo, dwm.monthinfo, dwm.dayinfo, dwm.origin_type_stat, dwm.for_new_user;
--月 INSERT INTO itcast_clue_dws PARTITION(yearinfo) SELECT sum(clue_nums) as clue_nums, origin_type_stat, for_new_user, '-1' as hourinfo, '-1' asdayinfo, monthinfo, '4' as time_type, concat(dwm.yearinfo,'-',dwm.monthinfo) as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm GROUP BY dwm.yearinfo, dwm.monthinfo, dwm.origin_type_stat, dwm.for_new_user;
--年 INSERT INTO itcast_clue_dws PARTITION(yearinfo) SELECT sum(clue_nums) as clue_nums, origin_type_stat, for_new_user, '-1' as hourinfo, '-1' as dayinfo, '-1' as monthinfo, '5' as time_type, dwm.yearinfo as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm GROUP BY dwm.yearinfo, dwm.origin_type_stat, dwm.for_new_user;
|
4.2.4 导出数据
4.2.4.1 创建Mysql表
CREATE TABLE `itcast_clue` ( `clue_nums` INT(11) COMMENT '有效线索量', `origin_type_stat` varchar(32) COMMENT '数据来源:0.线下;1.线上', `for_new_user` varchar(32) COMMENT '0:未知;1:新客户线索;2:旧客户线索', `hourinfo` varchar(32) COMMENT '小时信息', `dayinfo` varchar(32) COMMENT '日信息', `monthinfo` varchar(32) COMMENT '月信息', `time_type` varchar(32) COMMENT '聚合时间类型:1、按小时聚合;2、按天聚合;3、按周聚合;4、按月聚合;5、按年聚合;', `time_str` varchar(32) COMMENT '时间明细', `yearinfo` varchar(32) COMMENT '年信息' );
|
4.2.4.2 Sqoop导出脚本
sqoop export \
--connect "jdbc:mysql://192.168.52.150:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table itcast_clue \
--hcatalog-database itcast_dws \
--hcatalog-table itcast_clue_dws \
-m 100
|
4.3 增量流程
4.3.1 数据采集
4.3.1.1 customer_relationship表
ODS复用意向客户指标,不需要重复采集数据。
4.3.1.2 customer_appeal表
此表数据较少,因此可以直接全部覆盖同步,同全量过程。
4.3.2 数据清洗转换
4.3.2.1 分析
customer_clue表是一个拉链表,会保存数据的历史状态。因为业务方将更新周期限制在30天内,所以只需查询更新30天内的数据即可。
因此在统计时,我们只需要将上个月1日至今的数据进行统计。
4.3.2.2 代码
4.3.2.2.1 SQL:
--分区 SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; --hive压缩 set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; --写入时压缩生效 set hive.exec.orc.compression.strategy=COMPRESSION; --分桶 set hive.enforce.bucketing=true; set hive.enforce.sorting=true; set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; --并行执行 set hive.exec.parallel=true; set hive.exec.parallel.thread.number=8;
INSERT INTO itcast_dwd.itcast_clue_dwd PARTITION (yearinfo, monthinfo, dayinfo) SELECT clue.id, clue.customer_relationship_id, if(rs.origin_type='NETSERVICE' or rs.origin_type='PRESIGNUP', '1', '0') as origin_type_stat, if(clue.clue_state='VALID_NEW_CLUES', 1, if(clue.clue_state='VALID_PUBLIC_NEW_CLUE', 2, 0)) as for_new_user, clue.deleted, clue.create_date_time, substr(clue.create_date_time, 12, 2) as hourinfo, substr(clue.create_date_time, 1, 4) as yearinfo, substr(clue.create_date_time, 6, 2) as monthinfo, substr(clue.create_date_time, 9, 2) as dayinfo FROM itcast_ods.customer_clue TABLESAMPLE(BUCKET 1 OUT OF 10 ON customer_relationship_id) clue LEFT JOIN itcast_ods.customer_relationship rs on clue.customer_relationship_id = rs.id where clue.clue_state is not null AND clue.deleted = 'false' AND start_time = '${TD_DATE}';--2019-11-01
|
4.3.2.2.2 Shell脚本:
#! /bin/bash
#采集日期
if [[ $1 == "" ]];then
TD_DATE=`date -d '1 days ago' "+%Y-%m-%d"`
else
TD_DATE=$1
fi
${HIVE_HOME} -S -e "
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
--并行执行
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
INSERT INTO itcast_dwd.itcast_clue_dwd PARTITION (yearinfo, monthinfo, dayinfo)
SELECT
clue.id,
clue.customer_relationship_id,
if(rs.origin_type='NETSERVICE' or rs.origin_type='PRESIGNUP', '1', '0') as origin_type_stat,
if(clue.clue_state='VALID_NEW_CLUES', 1, if(clue.clue_state='VALID_PUBLIC_NEW_CLUE', 2, 0)) as for_new_user,
clue.deleted,
clue.create_date_time,
substr(clue.create_date_time, 12, 2) as hourinfo,
substr(clue.create_date_time, 1, 4) as yearinfo,
substr(clue.create_date_time, 6, 2) as monthinfo,
substr(clue.create_date_time, 9, 2) as dayinfo
FROM itcast_ods.customer_clue TABLESAMPLE(BUCKET 1 OUT OF 10 ON customer_relationship_id) clue
LEFT JOIN itcast_ods.customer_relationship rs on clue.customer_relationship_id = rs.id
where clue.clue_state is not null AND clue.deleted = 'false' substr(clue.create_date_time, 1, 10) >= '${TD_DATE}';--2019-11-01
"
|
4.3.3 统计分析
增量统计时,只需要统计上个月1日至今的数据。
4.3.3.1 DWM
4.3.3.1.1 SQL
--分区 SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; --hive压缩 set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; --写入表时压缩生效 set hive.exec.orc.compression.strategy=COMPRESSION; --分桶 set hive.enforce.bucketing=true; set hive.enforce.sorting=true; set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; --并行执行 set hive.exec.parallel=true; set hive.exec.parallel.thread.number=16;
INSERT into itcast_dwm.itcast_clue_dwm partition (yearinfo) SELECT count(dwd.id) as clue_nums, dwd.origin_type_stat, dwd.for_new_user, dwd.hourinfo, dwd.dayinfo, dwd.monthinfo, dwd.yearinfo from itcast_dwd.itcast_clue_dwd dwd WHERE dwd.customer_relationship_id NOT IN ( SELECT a.customer_relationship_first_id from itcast_ods.customer_appeal a WHERE a.appeal_status = 1 and a.customer_relationship_first_id != 0 ) AND concat_ws('-',dwd.yearinfo,dwd.monthinfo,dwd.dayinfo)>= '2019-11-01' GROUP BY dwd.yearinfo, dwd.monthinfo, dwd.dayinfo, dwd.hourinfo, dwd.origin_type_stat, dwd.for_new_user;
|
4.3.3.1.2 Shell脚本
#! /bin/bash
#上个月1日
Last_Month_DATE=$(date -d "$(date +%Y%m)01 last month" +%Y-%m-01)
${HIVE_HOME} -S -e "
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入表时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
--并行执行
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=16;
INSERT into itcast_dwm.itcast_clue_dwm partition (yearinfo)
SELECT
count(dwd.id) as clue_nums,
dwd.origin_type_stat,
dwd.for_new_user,
dwd.hourinfo,
dwd.dayinfo,
dwd.monthinfo,
dwd.yearinfo
from itcast_dwd.itcast_clue_dwd dwd
WHERE
dwd.customer_relationship_id NOT IN
(
SELECT a.customer_relationship_first_id from itcast_ods.customer_appeal a
WHERE a.appeal_status = 1 and a.customer_relationship_first_id != 0
)
AND concat_ws('-',dwd.yearinfo,dwd.monthinfo,dwd.dayinfo)>= '$Last_Month_DATE'
GROUP BY dwd.yearinfo, dwd.monthinfo, dwd.dayinfo, dwd.hourinfo, dwd.origin_type_stat, dwd.for_new_user;
"
|
4.3.3.2 DWS
统计上个月1日至今的数据。
4.3.3.2.1 SQL
--分区 SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; --分桶 set hive.enforce.bucketing=true; set hive.enforce.sorting=true; set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; --并行执行 set hive.exec.parallel=true; set hive.exec.parallel.thread.number=16; --小时 INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo) SELECT clue_nums, origin_type_stat, for_new_user, hourinfo, dayinfo, monthinfo, '1' as time_type, concat(dwm.yearinfo,'-',dwm.monthinfo,'-',dwm.dayinfo,' ', dwm.hourinfo) as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm where CONCAT_WS('-', dwm.yearinfo, dwm.monthinfo, dwm.dayinfo)>='${Last_Month_DATE}';
--天 INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo) SELECT sum(clue_nums) as clue_nums, origin_type_stat, for_new_user, '-1' as hourinfo, dayinfo, monthinfo, '2' as time_type, concat(dwm.yearinfo,'-',dwm.monthinfo,'-',dwm.dayinfo) as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm where CONCAT_WS('-', dwm.yearinfo, dwm.monthinfo, dwm.dayinfo)>='${Last_Month_DATE}' GROUP BY dwm.yearinfo, dwm.monthinfo, dwm.dayinfo, dwm.origin_type_stat, dwm.for_new_user;
--月 INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo) SELECT sum(clue_nums) as clue_nums, origin_type_stat, for_new_user, '-1' as hourinfo, '-1' asdayinfo, monthinfo, '4' as time_type, concat(dwm.yearinfo,'-',dwm.monthinfo) as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm where CONCAT_WS('-', dwm.yearinfo, dwm.monthinfo)>='${V_Month}' GROUP BY dwm.yearinfo, dwm.monthinfo, dwm.origin_type_stat, dwm.for_new_user;
--年 INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo) SELECT sum(clue_nums) as clue_nums, origin_type_stat, for_new_user, '-1' as hourinfo, '-1' as dayinfo, '-1' as monthinfo, '5' as time_type, dwm.yearinfo as time_str, yearinfo from itcast_dwm.itcast_clue_dwm dwm where dwm.yearinfo>='${V_Year}' GROUP BY dwm.yearinfo, dwm.origin_type_stat, dwm.for_new_user;
|
4.3.3.2.2 Shell脚本
#! /bin/bash
#上个月1日
Last_Month_DATE=$(date -d "$(date +%Y%m)01 last month" +%Y-%m-01)
#根据TD_DATE计算年季度月日
V_PARYEAR=`date --date="$Last_Month_DATE" +%Y`
V_PARMONTH=`date --date="$Last_Month_DATE" +%m`
V_PARDAY=`date --date="$Last_Month_DATE" +%d`
V_month_for_quarter=`date --date="$Last_Month_DATE" +%-m`
V_PARQUARTER=$(((${V_month_for_quarter}-1)/3+1))
#计算所需要的日期字符串
V_Month="${V_PARYEAR}"_"${V_PARMONTH}"
V_QUARTER="${V_PARYEAR}"_Q"${V_PARQUARTER}"
V_Year="${V_PARYEAR}"
${HIVE_HOME} -S -e "
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
--并行执行
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=16;
--小时
INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo)
SELECT
clue_nums,
origin_type_stat,
for_new_user,
hourinfo,
dayinfo,
monthinfo,
'1' as time_type,
concat(dwm.yearinfo,'-',dwm.monthinfo,'-',dwm.dayinfo,' ', dwm.hourinfo) as time_str,
yearinfo
from itcast_dwm.itcast_clue_dwm dwm
where CONCAT_WS('-', dwm.yearinfo, dwm.monthinfo, dwm.dayinfo)>='${Last_Month_DATE}';
--天
INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo)
SELECT
sum(clue_nums) as clue_nums,
origin_type_stat,
for_new_user,
'-1' as hourinfo,
dayinfo,
monthinfo,
'2' as time_type,
concat(dwm.yearinfo,'-',dwm.monthinfo,'-',dwm.dayinfo) as time_str,
yearinfo
from itcast_dwm.itcast_clue_dwm dwm
where CONCAT_WS('-', dwm.yearinfo, dwm.monthinfo, dwm.dayinfo)>='${Last_Month_DATE}'
GROUP BY dwm.yearinfo, dwm.monthinfo, dwm.dayinfo, dwm.origin_type_stat, dwm.for_new_user;
--月
INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo)
SELECT
sum(clue_nums) as clue_nums,
origin_type_stat,
for_new_user,
'-1' as hourinfo,
'-1' asdayinfo,
monthinfo,
'4' as time_type,
concat(dwm.yearinfo,'-',dwm.monthinfo) as time_str,
yearinfo
from itcast_dwm.itcast_clue_dwm dwm
where CONCAT_WS('-', dwm.yearinfo, dwm.monthinfo)>='${V_Month}'
GROUP BY dwm.yearinfo, dwm.monthinfo, dwm.origin_type_stat, dwm.for_new_user;
--年
INSERT INTO itcast_dws.itcast_clue_dws PARTITION(yearinfo)
SELECT
sum(clue_nums) as clue_nums,
origin_type_stat,
for_new_user,
'-1' as hourinfo,
'-1' as dayinfo,
'-1' as monthinfo,
'5' as time_type,
dwm.yearinfo as time_str,
yearinfo
from itcast_dwm.itcast_clue_dwm dwm
where dwm.yearinfo>='${V_Year}'
GROUP BY dwm.yearinfo, dwm.origin_type_stat, dwm.for_new_user;
"
|
4.3.4 导出数据
#! /bin/bash SQOOP_HOME=/usr/bin/sqoop HOST=192.168.52.150 USERNAME="root"
PASSWORD="123456" PORT=3306 DBNAME="scrm_bi" MYSQL=/usr/local/mysql_5723/bin/mysql
#上个月1日
if [[ $1 == "" ]];then Last_Month_DATE=$(date -d "$(date +%Y%m)01 last month" +%Y-%m-01) else Last_Month_DATE=$1 fi ${MYSQL} -h${HOST} -P${PORT} -u${USERNAME} -p${PASSWORD} -D${DBNAME} -e "delete from itcast_clue where yearinfo = '${Last_Month_DATE:0:4}'" ${SQOOP_HOME} export \ --connect "jdbc:mysql://${HOST}:${PORT}/${DBNAME}?useUnicode=true&characterEncoding=utf-8" \ --username ${USERNAME} \ --password ${PASSWORD} \ --table itcast_clue \ --hcatalog-database itcast_dws \ --hcatalog-table itcast_clue_dws \ --hcatalog-partition-keys yearinfo \ --hcatalog-partition-values ${TD_DATE:0:4} \ -m 100
|