大数据项目之用户行为分析实战【转载】

大数据架构介绍

一般企业大数据架构图

LAMBDA架构介绍(了解)

大概思路:将大数据系统构建为多个层次,三层架构:批处理层、实时处理层、服务层
https://blog.csdn.net/u013368491/article/details/71271864

大数据团队组织结构

  • 数据平台负责人
  • 数据平台架构师
  • 大数据开发(ETL开发、数仓开发)
  • 深度学习/AI工程师
  • BI

思考题、从零开始组建公司的大数据集群
  1. 如何确认集群规模?假设每台服务器8T硬盘
  2. 使用Apache/CDH/HDP版本?
  3. 服务器使用物理机还是云主机
  4. 大数据服务组件规划(出表)

平台搭建准备工作

  • 如何确认集群规模(主要根据数据量)

    按每条日志1K,每天1亿条,半年内不扩容服务器来算:100000000 / 1024 / 1024 = 约100G,保存半年约18T,保存3副本,共54T左右,再预留20%Buf,每台服务器8T硬盘预估,共需约9台服务器

  • 如何选择Apache/CDH/HDP版本?

    让同学了解在企业中如何做技术选型及大概思路

    1、Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)

    2、CDH:国内使用最多的版本,但CM不开源,但其实对中、小公司使用来说没有影响(建议使用)

    3、HDP:开源,可以进行二次开发,但是没有CDH稳定

  • 大数据服务组个规范,示例如下(因格式问题,暂未导入)TODO

  • 服务器使用物理机还是云主机

    成本考虑:
    1、物理机:以128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘,单台报价4W出头,需考虑托管服务器费用。一般物理机寿命5年左右
    2、云主机,以阿里云为例,差不多相同配置,每年5W
    运维成本考虑:
    1、物理机:需要有专业的运维人员
    2、云主机:很多运维工作都由阿里云已经完成,运维相对较轻松

集群安装过程中注意事项

  • 尽量使用离线方式安装

  • 使用非root用户,配置免密码的sudo权限

  • 确认HDFS的存储目录,保证存储在空间最大硬盘上

  • 元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)

  • 配置机架感知

  • 基准测试

  • 参数调优

1. dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为20台时,此参数设置为80  
    The Hadoop RPC server consists of a single RPC queue per port and multiple handler (worker) threads that dequeue and process requests. If the number of handlers is insufficient, then the RPC queue starts building up and eventually overflows. You may start seeing task failures and eventually job failures and unhappy users.
It is recommended that the RPC handler count be set to 20 * log2(Cluster Size) with an upper limit of 200.
2. dfs.namenode.service.handler.count=上面参数的一半  
    There is no precise calculation for the Service RPC handler count however the default value of 10 is too low for most production clusters. We have often seen this initialized to 50% of the dfs.namenode.handler.count in busy clusters and this value works well in practice.
3. dfs.namenode.edits.dir设置与dfs.namenode.name.dir尽量分开,达到最低写入延迟
4. dfs.namenode.accesstime.precision=0   
    The setting dfs.namenode.accesstime.precision controls how often the NameNode will update the last accessed time for each file. It is specified in milliseconds. If this value is too low, then the NameNode is forced to write an edit log transaction to update the file's last access time for each read request and its performance will suffer.  
    The default value of this setting is a reasonable 3600000 milliseconds (1 hour). We recommend going one step further and setting it to zero so last access time updates are turned off. Add the following to your hdfs-site.xml.

四大项目实战

  1. 用户行为分析
  2. 数据大屏:实时统计新增VIP用户数
  3. 实时应用:异常订单用户及时运营
  4. 用户画像

用户行为分析(离线)

项目介绍

用户行为分析,是指在获得网站或APP等平台访问量基本数据的情况下,
对有关数据进行统计、分析,从中发现用户访问网站或APP等平台的规律,
并将这些规律与网络营销策略等相结合,从而发现目前网络营销活动中可能存在的问题,
并为进一步修正或重新制定网络营销策略提供依据。

数据流图

需求指标

  • 课程学习反馈
  • 版本数据统计
  • 渠道访问
  • 访问次数分布:我们的指标是1-2次(包含),3-4次(包含),大于4次
  • 漏斗分析:打开app -> 开始看视频 - > 完成视频 -> 开始作业 -> 完成作业
  • 留存分析

日志数据格式,以\t分隔

45660	45660	M	1	0	ios	huawei	wifi	59.48.116.0	18701445660	1	0	0	0	1.0	startHomework	1554134400

项目实战需求一、数据采集

有20台业务服务器,设计从业务服务器的log中使用Flume实时采集数据到HDFS架构,过程中要保证数据不能丢失。请设计架构,同时编写Flume agent配置


数据采集 – Flume

Source

  • Taildir Source相比Exec Source、Spooling Directory Source的优势(自我介绍时的亮点)
    1.7版本之前,实现实时采集日志的Source只有Exec Source,但此Source可能会丢失数据(见官网描述)
    大家为了实现实时采集的效果,又保证数据安全,只能每隔半分钟产生一个并移动到Spooling Directory监控的目录中
    1.7版本之后,出现了Taildir Source,即可以实时采集数据,又保证了数据安全,内部实现了类似断点续传的功能

  • batchSize大小如何设置?event1K左右时,500-1000合适

Channel

  • FileChannel和MemoryChannel区别(面试题)
  • MemoryChannel:理解event是存储在JVM的堆内存中
  • Filechannel:
    1. 通过配置多dir增大吞吐量
    2. checkpointDir和backupCheckpointDir要配置在不同的磁盘中(如何确定Linux路径在哪块磁盘)

Sink

  • hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount默认配置会产生小文件问题(面试题:小文件对集群的影响)
  • 现实生活中理解小文件 – 年会游戏方案设计
  • hdfs.round、hdfs.roundValue、hdfs.roundUnit如何控制文件分区

以上几个参数实例说明

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = exec
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
    a1.sources.r1.command = tail -F /home/hadoop/guolong.txt
    a1.sources.r1.channels = c1
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://atguigu:8020/guolong/%Y%m%d
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.writeFormat=TEXT
    # 每600秒滚动一个文件
    a1.sinks.k1.hdfs.rollInterval=600
    # 每128M滚动一个文件
    a1.sinks.k1.hdfs.rollSize=134217728
    a1.sinks.k1.hdfs.rollCount=0
    # 每次拉取1000个event写入HDFS
    a1.sinks.k1.hdfs.batchsize=1000
    a1.sinks.k1.hdfs.threadsPoolSize=16
    a1.sinks.k1.channel=c1
    a1.sinks.k1.hdfs.filePrefix=guolong.%Y%m%d%H%M
    a1.sinks.k1.hdfs.idelTimeout=600
    
    a1.sinks.k1.hdfs.round=true
    a1.sinks.k1.hdfs.roundValue=10
    a1.sinks.k1.hdfs.roundUnit= minute

说明:
测试启动脚本:

bin/flume-ng agent -n a1(agent的名称) -c conf -f conf/example(配置文件名称) -Dflume.root.logger=DEBUG,console

基于以上hdfs.rollInterval=1800,hdfs.rollSize=134217728,hdfs.roundValue=10,hdfs.roundUnit= minute几个参数综合作用,效果如下:

1、tmp文件在达到128M时会滚动生成正式文件
2、tmp文件创建超10分钟时会滚动生成正式文件
举例:在2018-01-01 05:23的的时侯sink接收到数据,那会产生如下tmp文件:
/guolong/20180101/guolong.201801010520.tmp
即使文件内容没有达到128M,也会在05:33时滚动生成正式文件

Flume分层

  • 面试题:一般生产环境分为2层或更多:目的有如下2点
  1. 如果只有一层,日志采集服务器非常多,此时会有很多个Flume agent,同时向HDFS写数据会产生多个client,对HDFS来说压力过大
  2. 只有一层时,部分业务配置只能在这层配置,如后续配置修改,则要修改的位置太多,不利于后期维护
  • 采集层
  1. 使用supervior方式实现挂掉后自动重启
  2. JVM一般设置为512M即可
  3. 与业务服务器部署在一起
  • 汇聚层
  1. 使用load_balance
  2. JVM一般设置为4G
  3. 部署在单独的服务器上(4核8线程16G内存)
  • JVM调优
  1. -Xmx与-Xms设置一样,减少内存抖动带来的性能影响
  • 基于上面的双层架构图分析Flume如何保证数据至少处理一次
  1. 采集层agent挂掉
  2. 采集层服务器挂掉
  3. 汇总层agent部分挂掉
  • 思考题
  1. 什么情况下会出现数据重复?举一个场景即可
  2. 现在汇聚层向HDFS写数据时,channel中积压了大量的数据,有哪些解决方案?

实战需求二 数据清洗、加载
  • 代码规范
  1. 注释一定要有且清晰、明了
  2. 变量名不要出现"xiaoming_test"等,尽量见名知义
  • 假定现在已经将数据保存到HDFS的/user/hive/warehouse/ods.db/origin_user_behavior/${day}目录中,需要用SparkCore将数据清洗,清洗需求如下:
  1. 手机号脱敏:187xxxx2659
  2. 过滤重复行(重复条件,uid,event_key,event_time三者都相同即为重复)
  • 最终数据保存到ods.user_behavior分区表,以dt(天)为分区条件,表的文件存储格式为ORC,数据总量为xxxx条
  • Hive 字段如下:
    uid STRING comment "用户唯一标识",    
    username STRING comment "用户昵称",  
    gender STRING comment "性别",  
    level TINYINT comment "1代表小学,2代表初中,3代表高中",  
    is_vip TINYINT comment "0代表不是会员,1代表是会员",  
    os STRING comment "操作系统:os,android等",  
    channel STRING comment  "下载渠道:auto,toutiao,huawei",  
    net_config STRING comment "当前网络类型",  
    ip STRING comment "IP地址",  
    phone STRING comment "手机号码",  
    video_id INT comment "视频id",  
    video_length INT comment "视频时长,单位秒",  
    start_video_time BIGINT comment "开始看视频的时间缀,秒级",  
    end_video_time BIGINT comment "退出视频时的时间缀,秒级",  
    version STRING comment "版本",  
    event_key STRING comment  "事件类型",  
    event_time BIGINT comment  "事件发生时的时间缀,秒级"     

数据清洗、加载

  • 清洗逻辑
  1. 手机号脱敏:187xxxx2659
  2. 用户自定义字段要过滤\n,否则引起HDFS换行,如:username
  3. 过滤重复行(SQL方式实现)

整体流程(使用调度系统)

    1. SparkCore清洗数据,写入到/user/hive/warehouse/tmp.db/user_behavior_${day}目录
    2. 建立tmp.user_behavior_${day}临时表,并加载上面清洗后的数据
    3. 使用hive引擎,并用开窗函数row_number,将tmp.user_behavior_${day}表数据插入到dwd.user_behavior表中
    4. 删除tmp.user_behavior_${day}临时表

数据清洗代码

    inputPath:/user/hive/warehouse/ods.db/origin_user_behavior/${day}
    outputPath:/user/hive/warehouse/tmp.db/user_behavior_${day}  
    
    package com.atguigu.user_behavior
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 用户行为数据清洗
      * 1、验证数据格式是否正确,切分后长度必须为17
      * 2、手机号脱敏,格式为123xxxx4567
      * 3、去掉username中带有的\n,否则导致写入HDFS时会换行
      */
    object UserBehaviorCleaner {
      def main(args : Array[String]): Unit ={
    
        if(args.length != 2){
          println("Usage:please input inputPath and outputPath")
          System.exit(1)
        }
    
        // 获取输入输出路径
        val inputPath = args(0)
        val outputPath = args(1)
    
        val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
        val sc = new SparkContext(conf)
    
        // 通过输入路径获取RDD
        val eventRDD: RDD[String] = sc.textFile(inputPath)
    
        // 清洗数据,在算子中不要写大量业务逻辑,应该将逻辑封装到方法中
        eventRDD.filter(event => checkEventValid(event))  // 验证数据有效性
          .map( event => maskPhone(event))  // 手机号脱敏
          .map(event => repairUsername(event)) // 修复username中带有\n导致的换行
          .coalesce(3)
          .saveAsTextFile(outputPath)
    
        sc.stop()
      }
    
      /**
        * username为用户自定义的,里面有要能存在"\n",导致写入到HDFS时换行
        * @param event
        */
      def repairUsername(event : String)={
        val fields = event.split("\t")
    
        // 取出用户昵称
        val username = fields(1)
    
        // 用户昵称不为空时替换"\n"
        if(username != "" && !"Null".equals(username)){
          fields(1) = username.replace("\n","")
        }
    
        fields.mkString("\t")
      }
    
      /**
        * 脱敏手机号
        * @param event
        */
      def maskPhone(event : String): String ={
        var maskPhone = new StringBuilder
        val fields: Array[String] = event.split("\t")
    
        // 取出手机号
        val phone = fields(9)
    
        // 手机号不为空时做掩码处理
        if(phone != null && !"".equals(phone)){
          maskPhone = maskPhone.append(phone.substring(0,3)).append("xxxx").append(phone.substring(7,11))
          fields(9) = maskPhone.toString()
        }
    
        fields.mkString("\t")
      }
    
    
      /**
        * 验证数据格式是否正确,只有切分后长度为17的才算正确
        * @param event
        */
      def checkEventValid(event : String) ={
        val fields = event.split("\t")
        fields.length == 17
      }
    }

部署

  • 面试题:yarn cluster和yarn client区别
  • executor memory和executor cores如何设置?
  • 扩展题:Yarn资源调度器种类,CDH和HDP版本默认不同调度器原因
spark-submit --master local[2] --class com.xxx.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar \
hdfs://xxx:8020/user/hive/warehouse/ods.db/origin_user_behavior/${day} \
hdfs://xxx:8020/user/hive/warehouse/tmp.db/user_behavior_${day}
  • Sparn On Yarn完整提交命令
spark-submit --master yarn --deploy-mode cluster --num-executors 8 --executor-cores 4 --executor-memory 12G \
--class com.atguigu.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar \
hdfs://xxx:8020/user/hive/warehouse/ods.db/origin_user_behavior/${day} \
hdfs://xxx:8020/user/hive/warehouse/tmp.db/user_behavior_${day}

远程调试

场景:以后工作中经常会遇到在本地执行没有问题,到了服务器跑的数据就是错误的
  1. IDEA设置:Run --> Edit Configurations添加Remote

  2. 在提交脚本中添加–driver-java-options参数

spark-submit --master local[2] --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=18888" --class com.xxx.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar hdfs://xxx:8020/user/hive/warehouse/ods.db/origin_user_behavior/${day} hdfs://xxx:8020/user/hive/warehouse/tmp.db/user_behavior_${day}  
  1. 在服务器提交任务,此时任务处理阻塞状态
  2. 在idea中点击Remote服务对应的debug按钮
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-M1as0PCb-1614946571731)(https://note.youdao.com/yws/res/21198/6284CA8AB23A486EA140791402DC57AE)]

sparkcore清洗后的数据加载到临时表中

    create table if not exists tmp.user_behavior_${day}(
    uid STRING comment "用户唯一标识",
    username STRING comment "用户昵称",
    gender STRING comment "性别",
    level TINYINT comment "1代表小学,2代表初中,3代表高中",
    is_vip TINYINT comment "0代表不是会员,1代表是会员",
    os STRING comment "操作系统:os,android等",
    channel STRING comment  "下载渠道:auto,toutiao,huawei",
    net_config STRING comment "当前网络类型",
    ip STRING comment "IP地址",
    phone STRING comment "手机号码",
    video_id INT comment "视频id",
    video_length INT comment "视频时长,单位秒",
    start_video_time BIGINT comment "开始看视频的时间缀,秒级",
    end_video_time BIGINT comment "退出视频时的时间缀,秒级",
    version STRING comment "版本",
    event_key STRING comment  "事件类型",
    event_time BIGINT comment  "事件发生时的时间缀,秒级")
    row format delimited fields terminated by "\t" 
    location "/user/hive/warehouse/tmp.db/user_behavior_${day}";
    
思考:
1、level和is_vip使用TINYINT,而不是使用INT?
2、分区字段dt为什么要存储int型,如20190408,而不是字符串的'2019-04-08'

说明:event_key为endVideo时,会发送start_video_time和end_video_time字段

orc格式外部表,schema与上面相同

    create external table if not exists dwd.user_behavior(
    uid STRING comment "用户唯一标识",
    username STRING comment "用户昵称",
    gender STRING comment "性别",
    level TINYINT comment "1代表小学,2代表初中,3代表高中",
    is_vip TINYINT comment "0代表不是会员,1代表是会员",
    os STRING comment "操作系统:os,android等",
    channel STRING comment  "下载渠道:auto,toutiao,huawei",
    net_config STRING comment "当前网络类型",
    ip STRING comment "IP地址",
    phone STRING comment "手机号码",
    video_id INT comment "视频id",
    video_length INT comment "视频时长,单位秒",
    start_video_time BIGINT comment "开始看视频的时间缀,秒级",
    end_video_time BIGINT comment "退出视频时的时间缀,秒级",
    version STRING comment "版本",
    event_key STRING comment  "事件类型",
    event_time BIGINT comment  "事件发生时的时间缀,秒级") partitioned by(dt INT)  
    row format delimited fields terminated by "\t" stored as ORC
  • 面试题:
    Textfile、Parquet、ORC格式选择(面试亮点:使用ORC后,空间节省90%,查询提升3-5倍)

  • 外部表与内部表在企业中怎么使用?

  1. 数仓中的表都使用外部表
  2. 分析而生成的中间结果表,都使用内部表,并且这类表一般以日期结尾,这样可以清晰意识到这是个中间表,还能知道是哪天创建的。这类表当天使用完后就会在脚本的最后将其删除

将tmp.user_behavior_${tmp}的数据导入到ORC表中,使用开窗函数实现去重业务

  insert overwrite table dwd.user_behavior partition(dt=${day})
  select 
  uid,
  username,
  gender,
  level,
  is_vip,
  os,
  channel,
  net_config,
  ip,
  phone,
  video_id,
  video_length,
  start_video_time,
  end_video_time,
  version,
  event_key,
  event_time  
  from (
  select 
  uid,
  username,
  gender,
  level,
  is_vip,
  os,
  channel,
  net_config,
  ip,
  phone,
  video_id,
  video_length,
  start_video_time,
  end_video_time,
  version,
  event_key,
  event_time,
  row_number() OVER (PARTITION BY uid,event_key,event_time ORDER BY event_time) u_rank 
  from tmp.user_behavior_${day} 
  ) temp where u_rank = 1

指标实现

event_key值

  • startApp 打开App
  • closeApp 关闭App
  • registerAccount 注册用户
  • startVideo 开始看视频
  • endVideo 结束看视频
  • startHomework 开始作业
  • completeHomework 完成作业
  • shareVideo 分享视频
  • enterOrderPage 进入订单详情页
  • completeOrder 支付完成订单,成为vip

说明:每个event_key代表一种行为


课程学习反馈(overview.png)难度指数:2颗星

指标

今日观看视频用户数

今日完成视频用户数

mysql schema
CREATE TABLE app_cource_study_report (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `watch_video_cnt` int(11) DEFAULT NULL,
  `complete_video_cnt` int(11) DEFAULT NULL,
  `dt` int(11) DEFAULT NULL,
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `app_cource_study_report_dt` (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

建表时一定要指定created_at和updated_at字段,便于脚本重跑后定位问题

hive schema

create table if not exists tmp.app_cource_study_analysis_${day}(
    watch_video_count INT,
    complete_video_count INT,
    dt INT
) row format delimited fields terminated by "\t";
SQL实现(overview.png)
insert overwrite table tmp.app_cource_study_analysis_${day} 
select sum(watch_video_count),sum(complete_video_count),dt from (
select count(distinct uid) as watch_video_count,0 as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = "startVideo" group by dt 
union all
select 0 as watch_video_count,count(distinct uid) as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = "endVideo" 
and (end_video_time - start_video_time) >= video_length group by dt) tmp group by dt

要点:
获取用户数时,需要根据用户唯一标识uid进行去重操作

插入时为什么要用overwrite而不是into

面试题:union与union all的区别

使用Hive实现并编写脚本

app_course_study_analysis.sh

#! /bin/bash

day=$1

# 验证输入参数的合法性
if [ ${#day} -ne 8 ];then
	echo "Please input date,eg:20190402"
	exit 1
fi

# 创建临时表
hive -e "
create table if not exists tmp.app_cource_study_analysis_${day}(
        watch_video_count INT,
        complete_video_count INT,
        dt INT
) row format delimited fields terminated by '\t';"

# 向临时表插入数据
hive -e "
insert overwrite table tmp.app_cource_study_analysis_${day} 
select sum(watch_video_count),sum(complete_video_count),dt from (
select count(distinct uid) as watch_video_count,0 as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'startVideo' group by dt 
union 
select 0 as watch_video_count,count(distinct uid) as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'endVideo' 
and (end_video_time - start_video_time) >= video_length group by dt) tmp group by dt
"

使用SparkSQL实现

package com.atguigu.user_behavior

import org.apache.spark.sql.SparkSession

object AppCourseStudyAnalysis {
  def main(args: Array[String]): Unit = {

    // 获取日期并验证
    val day = args(0)
    if("".equals(day) || day.length() != 8){
      println("Usage:Please input date,eg:20190402")
      System.exit(1)
    }

    // 获取SparkSession,并支持Hive操作
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .enableHiveSupport()
      .master("local[2]")
      .getOrCreate()

    import spark.sql

    // 创建临时表
    sql(s"""
         |create table if not exists tmp.app_cource_study_analysis_${day}(
         |watch_video_count INT,
         |complete_video_count INT,dt INT)
         |row format delimited fields terminated by '\t'
        """.stripMargin)

    // 将分析结果插入临时表
    sql(
      s"""
        |insert overwrite table tmp.app_cource_study_analysis_${day}
        |select sum(watch_video_count),sum(complete_video_count),dt from (
        |select count(distinct uid) as watch_video_count,0 as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'startVideo' group by dt
        |union
        |select 0 as watch_video_count,count(distinct uid) as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'endVideo'
        |and (end_video_time - start_video_time) >= video_length group by dt) tmp group by dt
      """.stripMargin)

    spark.stop()
  }
}

思考:
可以使用SparkSQL将分析结果直接写入到Mysql,而我们使用的是SparkSQL将结果写入到Hive后,再通过Sqoop导出到Mysql,两者哪个更好?

Sqoop导出(简单版,后面会深入)

sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --input-null-string '\\N'


实战需求三、业务指标统计,并将结果导入到mysql

  1. 版本数据统计
  2. 渠道访问统计
  3. 访问次数分布统计
  4. 漏斗统计
  5. 7日留存统计

版本数据统计(version.png)难度指数:1颗星

指标

各版本访问流量

ios访问流量

android访问流量

其它操作系统访问流量

hive schema

create table if not exists tmp.app_version_analysis_${day}(
    os STRING,
    version STRING,
    access_count INT,
    dt INT
)  row format delimited fields terminated by "\t" 

SQl实现

insert overwrite table tmp.app_version_analysis 
select os,version,count(1) as access_count,dt from dwd.user_behavior where dt = ${day} group by os,version,dt;

要点:
SQL语句中不要出现select *或count(*)等内容


渠道访问(channel.png)难度指数:1颗星

指标

昨日新增用户数

昨日渠道新增用户数

昨日自然新增用户数

hive schema

create table if not exists tmp.app_channel_analysis_${day}(
    channel STRING,
    new_user_cnt INT,
    dt INT
)  row format delimited fields terminated by "\t"

SQL实现

insert overwrite table tmp.app_channel_analysis_${day} 
select channel,count(distinct uid),dt from dwd.user_behavior where dt = ${day} and event_key = "registerAccount" group by channel,dt;

访问次数分布(access_number.png)难度指数:3颗星

指标

访问 1 - 2 次(包含2次)

访问 3 - 4 次(包含4次)

访问大于4次

hive schema

create table if not exists tmp.app_access_cnt_ranger_analysis_${day}(
    le_two INT,
    le_four INT,
    gt_four INT,
    dt INT
) row format delimited fields terminated by "\t";

SQL实现

第一步:计算每个用户的访问次数并分组
drop table if exists tmp.user_access_cnt_${day};
create table if not exists tmp.user_access_cnt_${day} as select uid,count(1) as access_cnt,dt from dwd.user_behavior where dt = ${day} group by uid,dt;

第二步:根据访问次数来计算用户分步并插入最终表
insert overwrite table tmp.app_access_cnt_ranger_analysis_${day} 
select sum(le_two) as le_two,sum(le_four) as le_four,sum(gt_four) as gt_four,dt from 
(select count(1) as le_two,0 as le_four,0 as gt_four,dt from tmp.user_access_cnt_${day} where  access_cnt <= 2 group by dt 
union all 
select 0 as le_two,count(1) as le_four,0 as gt_four,dt from tmp.user_access_cnt_${day} where  access_cnt <= 4 group by dt 
union all 
select 0 as le_two,0 as le_four,count(1) as gt_four,dt from tmp.user_access_cnt_${day} where  access_cnt > 4 group by dt) tmp 
group by dt; 

漏斗分析(funnel.png)难度指数:4颗星

指标

打开app -> 开始看视频 - > 完成视频 -> 开始作业 -> 完成作业

只有看了视频,才有可能完成视频,才能开始写作业,也可以不写,但是写作业不一定完成,所以每一步都会有数据流失

hive schema

create table if not exists tmp.app_study_funnel_analysis_${day}(
    start_app_cnt INT,
    start_video_cnt INT,
    complete_video_cnt INT,
    start_homework_cnt INT,
    complete_homework INT,
    dt INT
) row format delimited  fields terminated by "\t";

SQL实现

insert overwrite table tmp.app_study_funnel_analysis_${day}  
select count(distinct t1.uid) as start_app_cnt,count(distinct t2.uid) as start_video_cnt,count(distinct t3.uid) as complete_video_cnt,count(distinct t4.uid) as start_homework,count(distinct t5.uid) as complete_homework,t1.dt from 
(select uid,dt from dwd.user_behavior where dt = ${day} and event_key = "startApp") t1
left join 
(select uid from dwd.user_behavior where dt = ${day} and event_key = "startVideo") t2 
on t1.uid = t2.uid 
left join 
(select uid from dwd.user_behavior where dt = ${day} and event_key = "endVideo" and (end_video_time - start_video_time) >= video_length) t3 
on t2.uid = t3.uid 
left join 
(select uid from dwd.user_behavior where dt = ${day} and event_key = "startHomework") t4
on t3.uid = t4.uid 
left join 
(select uid from dwd.user_behavior where dt = ${day} and event_key = "completeHomework") t5
on t4.uid = t5.uid  group by t1.dt

思考:
1、时序漏洞怎么做,全部时序要求在2小时内
2、每步时序在15分钟内

留存分析(retained.png)难度指数:5颗星

指标 7日内留存

hive schema

create table tmp.seven_days_retained_analysis_${day}(
    register_day INT,
    zero_interval_retained_rate DOUBLE,
    one_interval_retained_rate DOUBLE,
    two_interval_retained_rate DOUBLE,
    three_interval_retained_rate DOUBLE,
    four_interval_retained_rate DOUBLE,
    five_interval_retained_rate DOUBLE,
    six_interval_retained_rate DOUBLE,
    dt INT
) row format delimited fields terminated by "\t";

SQL实现

// 获取近7日内全部用户的注册日期
select uid,dt as register_day,event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount"

// 获取近7日每天活跃的用户列表
select uid,dt as active_day,max(eventTime) as event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} group by uid,dt

// 两者整合,生成uid,register_day,active_day,interval(活跃时距离注册日期几天)
select t1.uid,t1.register_day,t2.active_day,datediff(from_unixtime(t2.event_time,"yyyy-MM-dd"),from_unixtime(t1.event_time,"yyyy-MM-dd")) as day_interval from 
(select uid,dt as register_day,event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount") t1 
left join 
(select uid,dt as active_day,max(event_time) as event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} group by uid,dt) t2  
on t1.uid = t2.uid  

结果格式:
001	20190301	20190301 	0
001 20190301	20190303	2
002	20190302	20190303	1

// 根据上面的表再生成留存用户数临时表 
drop table if exists tmp.user_retained_${startDay}_${endDay};create table if not exists  tmp.user_retained_${startDay}_${endDay} as 
select register_day,day_interval,count(1) as retained from (
select t1.uid,t1.register_day,t2.active_day,datediff(from_unixtime(t2.event_time,"yyyy-MM-dd"),from_unixtime(t1.event_time,"yyyy-MM-dd")) as day_interval from 
(select uid,dt as register_day,event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount") t1 
left join 
(select uid,dt as active_day,max(event_time) as event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} group by uid,dt) t2  
on t1.uid = t2.uid) tmp group by register_day,day_interval  

数据结果:
20190402        0       27000
20190402        1       19393
20190402        2       14681
20190402        3       9712
20190402        4       5089
20190402        5       1767
20190402        6       1775

// 计算近7日留存率
drop table if exists tmp.user_retained_rate_${startDay}_${endDay};create table if not exists tmp.user_retained_rate_${startDay}_${endDay} as 
select register_day,day_interval,round(retained / register_cnt,4) as retained_rate,current_dt from (
select t1.register_day,t1.day_interval,t1.retained,t2.register_cnt,${endDay} as current_dt from 
(select register_day,day_interval,retained from tmp.user_retained_${startDay}_${endDay}) t1 
left join 
(select dt,count(1) as register_cnt from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount" group by dt) t2 
on t1.register_day = t2.dt 
group by t1.register_day,t1.day_interval ,t1.retained,t2.register_cnt) tmp

数据结果
20190402        0       1.0     20190402
20190402        1       0.7183  20190402
20190402        2       0.5437  20190402
20190402        3       0.3597  20190402
20190402        4       0.1885  20190402
20190402        5       0.0654  20190402
20190402        6       0.0657  20190402
20190403        0       1.0     20190402
20190403        1       0.7183  20190402
20190403        2       0.5437  20190402
20190403        3       0.3597  20190402
20190403        4       0.1885  20190402
20190403        5       0.0654  20190402

// 到这里还没有结束,咱们再来个列转行
insert overwrite table tmp.seven_days_retained_analysis_${day} 
select 
register_day,
max(case when day_interval = 0 then retained_rate else 0 end) as zero_interval_retained_rate,
max(case when day_interval = 1 then retained_rate else 0  end) as one_interval_retained_rate,
max(case when day_interval = 2 then retained_rate else 0 end) as two_interval_retained_rate,
max(case when day_interval = 3 then retained_rate else 0 end) as three_interval_retained_rate,
max(case when day_interval = 4 then retained_rate else 0 end) as four_interval_retained_rate,
max(case when day_interval = 5 then retained_rate else 0 end) as five_interval_retained_rate,
max(case when day_interval = 6 then retained_rate else 0 end) as six_interval_retained_rate,
current_dt 
from tmp.user_retained_rate_${startDay}_${endDay} group by register_day,current_dt;

思考题:上面的SQL完全可以写在一个大SQL中完成,为什么要分解成这么多步完成?

Sqoop导出深入

问题:如果之前导出的数据错了,要重新执行sqoop脚本,这时侯会有什么现象?
答:会在mysql中新生成dt相同,但统计结果不同的数据,造成数据混乱!
如何解决:
1. sqoop导出时使用upsert模式
sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --update-key dt --update-mode allowinsert --input-null-string '\\N'

使用这种模式的前提:
1、--update-key后面跟的字段要设置为唯一索引
create unique index app_cource_study_report_dt on app_cource_study_report (dt);
2、同时要想updated_at字段自动更新,需要创建触发器
ALTER TABLE app_cource_study_report MODIFY updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP NOT NULL;

再次使用sqoop导入,发现updated_at的日期没有变化,但是数据确实是插入了,这里面有个误区,只有记录的值被修改过,updated_at才会更新,使用如下语句再次测试:
UPDATE app_cource_study_report SET watch_video_cnt = 88888 
  • 面试题:任务本质只是MR的map任务
  • 面试题:sqoop如何保证导出数据不丢失?业务场景说明
    使用–staging-table方式
    sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by “\t” --export-dir “/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}” --staging-table app_cource_study_report_tmp --clear-staging-table --input-null-string ‘\N’

遗憾的是upsert模式与staging-table模式不兼容,二者只能选一,一般–staging_table方式更多

  • 面试题:Hive中看到的Null在底层存储的是什么内容
    1. 首选考虑"Null"和Null在数据库中的区别,比如count时
    2. 查看Sqoop官网,查看导入过程 中–null-string和–null-non-string参数的说明,同理导出时–input-null-string和–input-null-non-string就不难理解了

行为分析项目总结

  1. 学习了企业中Flume如何配置,并说明了为什么要设置2层,并分析了Flume如何保证数的不丢失(面试亮点)
  2. 掌握使用Hive和SparkSQL分别实现统计指标,并了解使用ORC效率非常高(面试亮点)
  3. 深入掌握了Sqoop的使用,特别是–staging-table(面试亮点)
上一篇:​阿里云RDS深度定制-XA Crash Safe


下一篇:遍历DataTable的方法大全