2021-02-11 大数据课程笔记 day22

第3天 离线项目-3 新增用户数据处理2021-02-11 大数据课程笔记 day22

时间维度
浏览器维度
平台维度
KPI 一个工具维度
通过以上四个维度的各种组合,计算它的新增用户指标

课程大纲

项目模块设计思路
新增用户指标 mapper 开发
新增用户指标 reducer 开发
新增用户指标 Runner 开发
MapReduce 结果存 MySQL
新增用户指标运行结果

hbase

uuid,servertime,browser,platform,kpi


事件  lanuch

	时间
	浏览器
	平台
	kpi 模块
	

时间      r
时间,浏览器    r
时间,平台      r
时间,浏览器,平台   r


时间  分组
时间,浏览器 分组

手机

品牌 两个品牌
型号 三个型号

问题:
1、一共库存多少手机
2、某个品牌库存多少手机
3、某个品牌某个型号库存多少手机

phone1   brand1  type1
phone2   brand2  type2
phone3   brand2  type3
phone4   brand1  type2

phone1   brand1  type1

phone1   brand1   type1
phone1   brand2   type1
phone2   brand1   type3

phone1   brand1  type1

phone1   
phone1   type1
phone1  brand1  type1

brand1  phone1
type1    phone1
brand1 type1  phone1

phone1

维度组合的类图:2021-02-11 大数据课程笔记 day22

项目模块设计思路

map
数据裂变
纬度组合

reduce
汇聚统计

1、 从 hbase 读取数据
2、 纬度的组合
3、 reducer 聚合
4、 数据放 MySQL
a) TableMapReduceUtil.initTableReducerJob();
b) 自己实现向 MySQL 存数据的OutputFormat

新增用户指标 mapper 开发

TransformerOutputFormat.class自定义的用于向MySQL插入数据的类

四个纬度:时间、浏览器、平台、模块
需要对LAUNCH_EVENT数据过滤
组合四个纬度,向输出外键值对信息。
2021-02-11 大数据课程笔记 day22
维度组合有多少种?
各个维度的种类相乘得到结果

新增用户指标 reducer 开发

由于统计的是用户的数量,需要对 log 进行 uuid 的过滤,因为同一个人有可能点击了多次。

新增用户指标 Runner 开发

1、scan 添加过滤器 , startKey stopKey
2、指定 en=e_l 的事件条件
3、指定要获取的列名
MultipleColumnPrefixFilter
4、指定表名
scan.setAttribute(Scan.SCAN_ATTRIUBTES_TABLE_NAME, EventLogContants.HBASE_NAME_EVENT_LOGS.getBytes());

TestDataMaker 用于生成 hbase 的数据。

service.impl 修改连接 MySQL 的字符串
transformer-env.xml中修改连接 MySQL 的字符串
com.sxt.transformer.service.impl.DimensionConverterImpl修改连接数据库字符串

MapReduce 结果存 MySQL

TransformerOutputFormat 类中的 RecordWriter 用于向 MySQL 输出

新增用户指标运行结果

活跃用户数据统计

活跃用户数据统计

关于架构:
nginx 文件需要滚动,让 flume 监控某个目录,预防 nginx 端日志文件过大。
ETL,公司封装好的。

1、活跃用户需要统计的是 PV 事件

2、需要的维度:
时间、平台、模块、浏览器
3、使用 StatsUserDimension 作为 key,使用 TimeOutputValue 作为输出
如果有可能,需要自定义单独的维度类,以及使用封装类封装维度类信息。
4、需要指定output-collector.xmlquery-mapping.xml中的配置
其中output-collector.xml是自己实现的如何给PreparedStatement赋值以及添加addBatch()
query-mapping.xml中用于配置 sql 语句的,包括如何查询或插入父表数据,如何获取主表 ID,以及如何向 MySQL 插入数据的 SQL 语句。

项目中如何新增模块?

1、 开发 Runner、Mapper 和 Reducer
2、 添加额外的单一维度对象
3、 添加纬度对象组合
4、 添加对应的 collector 类
5、 修改配置文件
a) query-mapping.xml添加对应的 SQL 语句
b) output-collector.xml添加对应的反射类

第五天 大数据网站日志离线分析项目

1 hive 和 hbase 的整合
https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

注意事项:

版本信息
Avro Data Stored in HBase Columns
As of Hive 0.9.0 the HBase integration requires at least HBase 0.92, earlier versions of Hive were working with HBase 0.89/0.90

Hive 0.9.0与HBase 0.92兼容。

版本信息
Hive 1.x will remain compatible with HBase 0.98.x and lower versions. Hive 2.x will be compatible with HBase 1.x and higher. (See HIVE-10990 for details.) Consumers wanting to work with HBase 1.x using Hive 1.x will need to compile Hive 1.x stream code themselves.

Hive 1.x仍然和HBase 0.98.x兼容。

HIVE-705 提出的原生支持的 Hive 和 HBase 的整合。可以使用 Hive QL 语句访问 HBase 的表,包括 SELECT 和 INSERT。甚至让 hive 做 Hive 表和 HBase 表的 join 操作和 union 操作。

需要 jar 包(hive 自带)

hive-hbase-handler-x.y.z.jar

在 hive 的服务端: hive 远程模式:服务端 + 客户端 metastore hive2021-02-11 大数据课程笔记 day22

然后正常启动:hive --service metastore
启动客户端 CLI:hive

要在 hive 中操作 hbase 的表,需要对列进行映射。

CREATE external TABLE hbase_table_1(key string, value string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "xyz",
 "hbase.mapred.output.outputtable" = "xyz");

必须指定hbase.columns.mapping属性。
hbase.table.name属性可选,用于指定 hbase 中对应的表名,允许在 hive 表中使用不同的表名。上例中,hive 中表名为 hbase_table_1,hbase 中表名为 xyz。如果不指定,hive 中的表名与 hbase 中的表名一致。
hbase.mapred.output.outputtable属性可选,向表中插入数据的时候是必须的。该属性的值传递给了hbase.mapreduce.TableOutputFormat使用。

在 hive 表定义中的映射hbase.columns.mapping中的cf1:val在创建完表之后,hbase 中只显示 cf1,并不显示 val,因为 val 是行级别的,cf1 才是 hbase 中表级别的元数据。

具体操作:
hive:

CREATE TABLE hbase_table_1(key int, value string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "xyz", "hbase.mapred.output.outputtable" = "xyz");

hbase:

list
desc 'xyz'

hive 操作:

insert into hbase_table_1 values(1,'zhangsan');

hbase 操作:

scan 'xyz'

建立外部表要求 hbase 中必须有表对应。

hbase 操作:

create 'tb_user', 'info'

hive 操作:

create external table hive_tb_user1 (
key int,
name string,
age int,
sex string,
likes array<string>
)
row format
delimited
collection items terminated by '-'
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping"=":key,info:name,info:age,info:sex,info:likes")
tblproperties("hbase.table.name"="tb_user", "hbase.mapred.output.outputtable"="tb_user");

from tb_test
insert into table hive_tb_user
select 1,'zhangsan',25,'female',array('climbing','reading','shopping') limit 1;

hbase 操作:

scan 'tb_user'
put 'tb_user', 1, 'info:likes', 'like1-like2-like3-like4'

hive 和 hbase
要求在 hive 的 server 端中添加配置信息:
hive-site.xml中添加

<property>
  <name>hbase.zookeeper.quorum</name>
  <value>node2,node3,node4</value>
</property>

hive --service metastore

客户端直接启动 hive 就行了
hive

1、 创建 hive 的内部表,要求 hbase 中不能有对应的表
2、 创建 hive 的外部表,要求 hbase 中一定要有对应的表
3、 映射关系通过

a)	WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:id,cf:username,cf:age")

4、 stored by 指定 hive 中存储数据的时候,由该类来处理,该类会将数据放到hbase 的存储中,同时在 hive 读取数据的时候,由该类负责处理 hbase 的数据和 hive 的对应关系
a) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
5、指定 hive 表和 hbase 中的哪张表对应,outputtable 负责当 hive insert 数据的时候将数据写到 hbase 的哪张表。

TBLPROPERTIES ("hbase.table.name" = "my_table", "hbase.mapred.output.outputtable" = "my_table");

6、如果 hbase 中的表名和 hive 中的表名一致,则可以不指定 tblproperties。

2 sqoop 介绍+安装+数据导入

Sqoop: 将关系数据库(oracle、mysql、postgresql 等)数据与 hadoop 数据进行转换的工具
官网:http://sqoop.apache.org/
版本:(两个版本完全不兼容,sqoop1 使用最多)
sqoop1:1.4.x
sqoop2:1.99.x
同类产品
DataX:阿里*数据交换工具
sqoop 架构非常简单,是 hadoop 生态系统的架构最简单的框架。
sqoop1 由 client 端直接接入 hadoop,任务通过解析生成对应的 maprecue 执行

MR 中通过 InputFormat 和 OutputFormat 配置 MR 的输入和输出2021-02-11 大数据课程笔记 day22
2.1 sqoop 导入:2021-02-11 大数据课程笔记 day22
2.2 sqoop 导出2021-02-11 大数据课程笔记 day22
2.3 sqoop 安装和测试

解压
配置环境变量

SQOOP_HOME
PATH

添加数据库驱动包
配置sqoop-env.sh 不需要修改
注释掉bin/configure-sqoop中的第134-147行以关闭不必要的警告信息。
测试

sqoop version
	sqoop list-databases --connect jdbc:mysql://node4:3306/ --username root --password 123456
	sqoop help
	sqoop help command

直接在命令行执行:

sqoop list-databases --connect jdbc:mysql://node1:3306 --username hive --password hive123
将 sqoop 的命令放到文件中:
sqoop1.txt
######################
list-databases
--connect
jdbc:mysql://node4:3306
--username
hive
--password
hive123
######################
命令行执行:
sqoop --options-file sqoop1.txt
[root@node4 sqoop-1.4.6]# sqoop help list-databases
usage: sqoop list-databases [GENERIC-ARGS] [TOOL-ARGS]

Common arguments:
   --connect <jdbc-uri>                         Specify JDBC connect
                                                string
   --connection-manager <class-name>            Specify connection manager
                                                class name
   --connection-param-file <properties-file>    Specify connection
                                                parameters file
   --driver <class-name>                        Manually specify JDBC
                                                driver class to use
   --hadoop-home <hdir>                         Override
                                                $HADOOP_MAPRED_HOME_ARG
   --hadoop-mapred-home <dir>                   Override
                                                $HADOOP_MAPRED_HOME_ARG
   --help                                       Print usage instructions
-P                                              Read password from console
   --password <password>                        Set authentication
                                                password
   --password-alias <password-alias>            Credential provider
                                                password alias
   --password-file <password-file>              Set authentication
                                                password file path
   --relaxed-isolation                          Use read-uncommitted
                                                isolation for imports
   --skip-dist-cache                            Skip copying jars to
                                                distributed cache
   --username <username>                        Set authentication
                                                username
   --verbose                                    Print more information
                                                while working

Generic Hadoop command-line arguments:
(must preceed any tool-specific arguments)
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]

从hive导出到MySQL,则需要在hive的主机(比如hive的客户端所在的位置)安装sqoop。

$CONDITIONS

[root@server3 ~]# sqoop help import
usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]

Common arguments:
   --connect <jdbc-uri>                         Specify JDBC connect
                                                string
   --connection-manager <class-name>            Specify connection manager
                                                class name
   --connection-param-file <properties-file>    Specify connection
                                                parameters file
   --driver <class-name>                        Manually specify JDBC
                                                driver class to use
   --hadoop-home <hdir>                         Override
                                                $HADOOP_MAPRED_HOME_ARG
   --hadoop-mapred-home <dir>                   Override
                                                $HADOOP_MAPRED_HOME_ARG
   --help                                       Print usage instructions
-P                                              Read password from console
   --password <password>                        Set authentication
                                                password
   --password-alias <password-alias>            Credential provider
                                                password alias
   --password-file <password-file>              Set authentication
                                                password file path
   --relaxed-isolation                          Use read-uncommitted
                                                isolation for imports
   --skip-dist-cache                            Skip copying jars to
                                                distributed cache
   --username <username>                        Set authentication
                                                username
   --verbose                                    Print more information
                                                while working

Import control arguments:
   --append                                                   Imports data
                                                              in append
                                                              mode
   --as-avrodatafile                                          Imports data
                                                              to Avro data
                                                              files
   --as-parquetfile                                           Imports data
                                                              to Parquet
                                                              files
   --as-sequencefile                                          Imports data
                                                              to
                                                              SequenceFile
                                                              s
   --as-textfile                                              Imports data
                                                              as plain
                                                              text
                                                              (default)
   --autoreset-to-one-mapper                                  Reset the
                                                              number of
                                                              mappers to
                                                              one mapper
                                                              if no split
                                                              key
                                                              available
   --boundary-query <statement>                               Set boundary
                                                              query for
                                                              retrieving
                                                              max and min
                                                              value of the
                                                              primary key
   --columns <col,col,col...>         指定将数据库表中的哪些列数据导入
                                                              
   --compression-codec <codec>                                Compression
                                                              codec to use
                                                              for import
   --delete-target-dir                 Imports data  in delete mode
   --direct                                                   Use direct
                                                              import fast
                                                              path
   --direct-split-size <n>                                    Split the
                                                              input stream
                                                              every 'n'
                                                              bytes when
                                                              importing in
                                                              direct mode
-e,--query <statement>                                        Import
                                                              results of
                                                              SQL
                                                              'statement'
   --fetch-size <n>                                           Set number
                                                              'n' of rows
                                                              to fetch
                                                              from the
                                                              database
                                                              when more
                                                              rows are
                                                              needed
   --inline-lob-limit <n>                                     Set the
                                                              maximum size
                                                              for an
                                                              inline LOB
-m,--num-mappers <n>                                          Use 'n' map
                                                              tasks to
                                                              import in
                                                              parallel
   --mapreduce-job-name <name>                                Set name for
                                                              generated
                                                              mapreduce
                                                              job
   --merge-key <column>                                       Key column
                                                              to use to
                                                              join results
   --split-by <column-name>                                   Column of
                                                              the table
                                                              used to
                                                              split work
                                                              units
   --table <table-name>                                       Table to
                                                              read
   --target-dir <dir>                                         HDFS plain
                                                              table
                                                              destination
   --validate                                                 Validate the
                                                              copy using
                                                              the
                                                              configured
                                                              validator
   --validation-failurehandler <validation-failurehandler>    Fully
                                                              qualified
                                                              class name
                                                              for
                                                              ValidationFa
                                                              ilureHandler
   --validation-threshold <validation-threshold>              Fully
                                                              qualified
                                                              class name
                                                              for
                                                              ValidationTh
                                                              reshold
   --validator <validator>                                    Fully
                                                              qualified
                                                              class name
                                                              for the
                                                              Validator
   --warehouse-dir <dir>                                      HDFS parent
                                                              for table
                                                              destination
   --where <where clause>                                     WHERE clause
                                                              to use
                                                              during
                                                              import
-z,--compress                                                 Enable
                                                              compression

Incremental import arguments:
   --check-column <column>        Source column to check for incremental
                                  change
   --incremental <import-type>    Define an incremental import of type
                                  'append' or 'lastmodified'
   --last-value <value>           Last imported value in the incremental
                                  check column

Output line formatting arguments:
   --enclosed-by <char>               Sets a required field enclosing
                                      character
   --escaped-by <char>                Sets the escape character
   --fields-terminated-by <char>      Sets the field separator character
   --lines-terminated-by <char>       Sets the end-of-line character
   --mysql-delimiters                 Uses MySQL's default delimiter set:
                                      fields: ,  lines: \n  escaped-by: \
                                      optionally-enclosed-by: '
   --optionally-enclosed-by <char>    Sets a field enclosing character

Input parsing arguments:
   --input-enclosed-by <char>               Sets a required field encloser
   --input-escaped-by <char>                Sets the input escape
                                            character
   --input-fields-terminated-by <char>      Sets the input field separator
   --input-lines-terminated-by <char>       Sets the input end-of-line
                                            char
   --input-optionally-enclosed-by <char>    Sets a field enclosing
                                            character


Code generation arguments:
   --bindir <dir>                        Output directory for compiled
                                         objects
   --class-name <name>                   Sets the generated class name.
                                         This overrides --package-name.
                                         When combined with --jar-file,
                                         sets the input class.
   --input-null-non-string <null-str>    Input null non-string
                                         representation
   --input-null-string <null-str>        Input null string representation
   --jar-file <file>                     Disable code generation; use
                                         specified jar
   --map-column-java <arg>               Override mapping for specific
                                         columns to java types
   --null-non-string <null-str>          Null non-string representation
   --null-string <null-str>              Null string representation
   --outdir <dir>                        Output directory for generated
                                         code
   --package-name <name>                 Put auto-generated classes in
                                         this package

Generic Hadoop command-line arguments:
(must preceed any tool-specific arguments)
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]


At minimum, you must specify --connect and --table
Arguments to mysqldump and other subprograms may be supplied
after a '--' on the command line.

命令行导入:

从 MySQL 导数据到 HDFS ,导入

sqoop import --connect jdbc:mysql://node4/log_results --username hivehive --password hive --as-textfile --table dimension_browser --columns id,browser_name,browser_version --target-dir /sqoop/test1 --delete-target-dir -m 1

将语句写入文件并运行:

sqoop2.txt

import
--connect
jdbc:mysql://node4/log_results
--username
hivehive
--password
hive
--as-textfile
--table
dimension_browser
--columns
id,browser_name,browser_version
--target-dir
/sqoop/test1
--delete-target-dir
-m
1

命令行:

sqoop --options-file sqoop2.txt

可以指定 SQL 执行导入:

sqoop3.txt

import
--connect
jdbc:mysql://node4/log_results
--username
hivehive
--password
hive
--as-textfile
#--query is the same as -e
-e
select id, browser_name, browser_version from dimension_browser where $CONDITIONS
--target-dir
/sqoop/test2
--delete-target-dir
-m
1

命令行:

sqoop --options-file sqoop3.txt

指定导出文件的分隔符:
sqoop4.txt

import
--connect
jdbc:mysql://node1/log_results
--username
hive
--password
hive123
--as-textfile
-e
select id, browser_name, browser_version from dimension_browser where $CONDITIONS
--target-dir
/sqoop/test2-1
--delete-target-dir
-m
1
--fields-terminated-by
\t

命令行:

sqoop --options-file sqoop4.txt

导入到HDFS以及在HIVE创建表 默认字段的分隔符就是逗号,可以不指定逗号
sqoop5.txt

import
--connect
jdbc:mysql://node1/log_results
--username
hivehive
--password
hive
--as-textfile
#--query is the same as -e
-e
select id, browser_name, browser_version from dimension_browser where $CONDITIONS
--hive-import
--create-hive-table
--hive-table
hive_browser_dim
--target-dir
/my/tmp
-m
1
--fields-terminated-by
,

命令行:

sqoop --options-file sqoop5.txt

导出:

hdfs://mycluster/sqoop/data/mydata.txt

1,zhangsan,hello world
2,lisi,are you ok
3,wangwu,fine thanks
4,zhaoliu,what are you doing
5,qunqi,just say hello

sqoop6.txt

export
--connect
jdbc:mysql://node1/log_results
--username
hivehive
--password
hive
--columns
id,myname,myversion
--export-dir
/user/hive/warehouse/hive_browser_dim/
-m
1
--table
mybrowserinfo
--input-fields-terminated-by
,

sqoop6-1.txt
由于文件默认使用,分割各个字段,sqoop 导出的时候默认也是使用,分割

export
--connect
jdbc:mysql://node1/log_results
--username
hivehive
--password
hive
--columns
id,myname,myversion
--export-dir
/user/hive/warehouse/hive_browser_dim/
-m
1
--table
mybrowserinfo1

命令行:

sqoop --options-file sqoop6-1.txt

令行执行:

sqoop --options-file sqoop6.txt

逗号不需要指定分隔符

默认的 hive 分隔符需要在 sqoop 文件中指定分隔符 \001:

sqoop11.txt

export
--connect
jdbc:mysql://node1/log_results
--username
hive
--password
hive123
--columns
id,name,msg
--export-dir
/user/hive/warehouse/tb_log2
-m
1
--table
tb_loglog
--input-fields-terminated-by
\001

用户浏览深度SQL分析

四种行转列:
	join
	union
	DECODE(oracle)
	case when

需求:
将用户访问的次数进行分组,每组多少人。

站在统计用户的角度

MySQL 中的stat_view_depth

1. 在 hive 中创建 hbase 的 eventlog 对应表

CREATE EXTERNAL TABLE event_logs(
key string, pl string, en string, s_time bigint, p_url string, u_ud string, u_sd string
) ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':key,log:pl,log:en,log:s_time,log:p_url,log:u_ud,log:u_sd')
tblproperties('hbase.table.name'='eventlog');

2. 创建 mysql 在 hive 中的对应表

hive 中的表,执行 HQL 之后分析的结果保存该表,然后通过 sqoop 工具导出到 mysql

CREATE TABLE `stats_view_depth` (
  `platform_dimension_id` bigint ,
  `data_dimension_id` bigint , 
  `kpi_dimension_id` bigint , 
  `pv1` bigint , 
  `pv2` bigint , 
  `pv3` bigint , 
  `pv4` bigint , 
  `pv5_10` bigint , 
  `pv10_30` bigint , 
  `pv30_60` bigint , 
  `pv60_plus` bigint , 
  `created` string
) row format delimited fields terminated by '\t';

3. hive创建临时表

把 hql 分析之后的中间结果存放到当前的临时表。

CREATE TABLE `stats_view_depth_tmp`(`pl` string, `date` string, `col` string, `ct` bigint);

pl 平台
date 日期
co l列,值对应于 mysql 表中的列:pv1,pv2,pv4….
ct 对应于每列的值

col 对应 mysql 中的 pv 前缀列。

4. 编写 UDF

(platformdimension & datedimension) 两个维度

package com.sxt.transformer.hive;


import com.sxt.common.DateEnum;
import com.sxt.transformer.model.dim.base.DateDimension;
import com.sxt.transformer.model.dim.base.PlatformDimension;
import com.sxt.transformer.service.IDimensionConverter;
import com.sxt.transformer.service.impl.DimensionConverterImpl;
import com.sxt.util.TimeUtil;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * 操作日期dimension 相关的udf
 * 
 * @author root
 *
 */
public class PlatformDimensionUDF extends UDF {
    private IDimensionConverter converter = new DimensionConverterImpl();

    /**
     * 根据给定的platform名称返回id
     * 
     * @param platform
     * @return
     */
    public IntWritable evaluate(Text platform) {
        PlatformDimension dimension = new PlatformDimension(platform.toString());

        try {
            int id = this.converter.getDimensionIdByValue(dimension);
            return new IntWritable(id);
        } catch (IOException e) {
            throw new RuntimeException("获取id异常");
        }
    }
}
package com.sxt.transformer.hive;

import java.io.IOException;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import com.sxt.common.DateEnum;
import com.sxt.transformer.model.dim.base.DateDimension;
import com.sxt.transformer.service.IDimensionConverter;
import com.sxt.transformer.service.impl.DimensionConverterImpl;
import com.sxt.util.TimeUtil;

/**
 * 操作日期dimension 相关的udf
 * 
 * @author root
 *
 */
public class DateDimensionUDF extends UDF {
    private IDimensionConverter converter = new DimensionConverterImpl();

    /**
     * 根据给定的日期(格式为:yyyy-MM-dd)至返回id
     * 
     * @param day
     * @return
     */
    public IntWritable evaluate(Text day) {
        DateDimension dimension = DateDimension.buildDate(TimeUtil.parseString2Long(day.toString()), DateEnum.DAY);
        try {
            int id = this.converter.getDimensionIdByValue(dimension);
            System.out.println(day.toString());
            System.out.println(id);
            return new IntWritable(id);
        } catch (IOException e) {
            throw new RuntimeException("获取id异常" + day.toString());
        }
    }
}

5. 上传

打包2021-02-11 大数据课程笔记 day22
hivejar.jar 上传到 hdfs 的 /sxt/transformer 文件夹中

6. 创建 hive 的f unction

#create function platformFunc as 'com.sxt.transformer.hive.PlatformDimensionUDF' using jar 'hdfs://mycluster/sxt/transformer/hivejar.jar';  
create function dateFunc as 'com.sxt.transformer.hive.DateDimensionUDF' using jar 'hdfs://mycluster/sxt/transformer/hivejar.jar';  

7. hql编写(统计用户角度的浏览深度)<注意:时间为外部给定>

from (
  select 
    pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_ud, 
    (case when count(p_url) = 1 then "pv1" 
      when count(p_url) = 2 then "pv2" 
      when count(p_url) = 3 then "pv3" 
      when count(p_url) = 4 then "pv4" 
      when count(p_url) >= 5 and count(p_url) <10 then "pv5_10" 
      when count(p_url) >= 10 and count(p_url) <30 then "pv10_30" 
      when count(p_url) >=30 and count(p_url) <60 then "pv30_60"  
      else 'pv60_plus' end) as pv 
  from event_logs 
  where 
    en='e_pv' 
    and p_url is not null 
    and pl is not null 
    and s_time >= unix_timestamp('2017-08-23','yyyy-MM-dd')*1000 
    and s_time < unix_timestamp('2017-08-24','yyyy-MM-dd')*1000
  group by 
    pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_ud
) as tmp
insert overwrite table stats_view_depth_tmp 
  select pl,day,pv,count(distinct u_ud) as ct where u_ud is not null group by pl,day,pv;

如何知道该访客是 pv10 的?
聚合操作
需要从hbase表中查询数据,对u_ud聚合,计算出多少个pv事件
case when得出该访客属于pv10

89155407 pv3
62439313 pv5_10
41469129 pv10_30
37005838 pv30_60
08257218 pv3

总的得出所有人属于 pv10

对所有的 pv10 聚合,计算 u_ud 的总数,得出 pv10 的有多少人

`pl` string, `date` string, `col` string, `ct` bigint
website  2019-11-18  pv10  300
website  2019-11-18  pv10  400
website  2019-11-18  pv10  500
website  2019-11-18  pv10  300
website  2019-11-18  pv5_10  20
website  2019-11-18  pv10_30  40
website  2019-11-18  pv30_60  10
website  2019-11-18  pv60_plus  120

总的得出所有 pv?有都少人

pv1 人数是多少?
聚合操作

行转列 -> 结果

– 把临时表的多行数据,转换一行

行转列

std prj score
S1 M 100
S1 E 98
S1 Z 80
S2 M 87
S2 E 88
S2 Z 89

2021-02-11 大数据课程笔记 day22

std M E Z
S1 100 98 80
S2 87 88 89
select std, score from my_score where prj='M';
select std, score from my_score where prj='E';
select std, score from my_score where prj='Z';
select std, t1.score, t2.score, t3.score from t1 join t2 on t1.std=t2.std
 join t3 on t1.std=t3.std;
SELECT t1.std, t1.score, t2.score, t3.score
from 
(select std, score from my_score where prj='M') t1
 join
(select std, score from my_score where prj='E') t2
 on t1.std=t2.std
 join (select std, score from my_score where prj='Z') t3
 on t1.std=t3.std;

采用 union all 的方式:

select tmp.std, sum(tmp.M), sum(tmp.E), sum(tmp.Z) from (
	select std, score as 'M', 0 as 'E', 0 as 'Z' from tb_score where prj='M' UNION ALL
	select std, 0 as 'M', score as 'E', 0 as 'Z' from tb_score where prj='E' UNION ALL
	select std, 0 as 'M', 0 as 'E', score as 'Z' from tb_score where prj='Z'
) tmp group by tmp.std;

2021-02-11 大数据课程笔记 day22

with tmp as 
(
select pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all

select 'all' as pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select 'all' as pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus'
)
from tmp
insert overwrite table stats_view_depth 
select 2,3,6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),'2017-01-10' group by pl,date1;

编写 UDF 获取 2,3,6 的值,2,3,6 是一个假的数据。

with tmp as 
(
select pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all

select 'all' as pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select 'all' as pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus'
)
from tmp
insert overwrite table stats_view_depth 
select platformFunc(pl),dateFunc(date1), 6 ,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),date1 group by pl,date1;

8. sqoop 脚本编写(统计用户角度)

sqoop --options-file sqoop_1
文件sqoop_1内容:

export

-connect
jdbc:mysql://node3:3306/log_results
--username
loguser
--password
log123
--table
stats_view_depth
--export-dir
/user/hive/warehouse/stats_view_depth/*
--input-fields-terminated-by
"\t"
--update-mode
allowinsert
--update-key
platform_dimension_id,data_dimension_id,kpi_dimension_id

站在会话角度的浏览深度

9. hql 编写(统计会话角度的浏览深度)<注意:时间为外部给定>

from (
select pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_sd,
 (case when count(p_url) = 1 then "pv1" 
 when count(p_url) = 2 then "pv2" 
 when count(p_url) = 3 then "pv3" 
 when count(p_url) = 4 then "pv4" 
 when count(p_url) >= 5 and count(p_url) <10 then "pv5_10" 
 when count(p_url) >= 10 and count(p_url) <30 then "pv10_30" 
 when count(p_url) >=30 and count(p_url) <60 then "pv30_60"  
 else 'pv60_plus' end) as pv 
from event_logs 
where en='e_pv' and p_url is not null and pl is not null and s_time >= unix_timestamp('2016-06-10','yyyy-MM-dd')*1000 and s_time < unix_timestamp('2016-06-11','yyyy-MM-dd')*1000
group by pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_sd
) as tmp
insert overwrite table stats_view_depth_tmp 
select pl,day,pv,count(distinct u_sd) as ct where u_sd is not null group by pl,day,pv;
with tmp as 
(
select pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all

select 'all' as pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select 'all' as pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select 'all' as pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus'
)
from tmp
insert overwrite table stats_view_depth select 3,4,6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),'2016-06-10' group by pl,date;

10. sqoop 脚本编写(统计会话角度)

sqoop --options-file sqoop_2
文件sqoop_2内容
export
--connect
jdbc:mysql://hh:3306/report
--username
hive
--password
hive
--table
stats_view_depth
--export-dir
/hive/bigdater.db/stats_view_depth/*
--input-fields-terminated-by
"\\01" 
--update-mode
allowinsert
--update-key
platform_dimension_id,data_dimension_id,kpi_dimension_id

shell 脚本编写

view_depth_run.sh
#!/bin/bash

startDate=''
endDate=''

until [ $# -eq 0 ]
do
	if [ $1'x' = '-sdx' ]; then
		shift
		startDate=$1
	elif [ $1'x' = '-edx' ]; then
		shift
		endDate=$1
	fi
	shift
done

if [ -n "$startDate" ] && [ -n "$endDate" ]; then
	echo "use the arguments of the date"
else
	echo "use the default date"
	startDate=$(date -d last-day +%Y-%m-%d)
	endDate=$(date +%Y-%m-%d)
fi
echo "run of arguments. start date is:$startDate, end date is:$endDate"
echo "start run of view depth job "

## insert overwrite
echo "start insert user data to hive tmp table"
hive  -e "from (select pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_ud, (case when count(p_url) = 1 then 'pv1' when count(p_url) = 2 then 'pv2' when count(p_url) = 3 then 'pv3' when count(p_url) = 4 then 'pv4' when count(p_url) >= 5 and count(p_url) <10 then 'pv5_10' when count(p_url) >= 10 and count(p_url) <30 then 'pv10_30' when count(p_url) >=30 and count(p_url) <60 then 'pv30_60'  else 'pv60_plus' end) as pv from event_logs where en='e_pv' and p_url is not null and pl is not null and s_time >= unix_timestamp('$startDate','yyyy-MM-dd')*1000 and s_time < unix_timestamp('$endDate','yyyy-MM-dd')*1000 group by pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_ud) as tmp insert overwrite table stats_view_depth_tmp select pl,day,pv,count(distinct u_ud) as ct where u_ud is not null group by pl,day,pv"

echo "start insert user data to hive table"
hive  -e "with tmp as (select pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all select 'all' as pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select 'all' as pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select 'all' as pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' ) from tmp insert overwrite table stats_view_depth select platform_convert(pl),date_convert(date),5,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),date group by pl,date"

echo "start insert session date to hive tmp table"
hive  -e "from (select pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_sd, (case when count(p_url) = 1 then 'pv1' when count(p_url) = 2 then 'pv2' when count(p_url) = 3 then 'pv3' when count(p_url) = 4 then 'pv4' when count(p_url) >= 5 and count(p_url) <10 then 'pv5_10' when count(p_url) >= 10 and count(p_url) <30 then 'pv10_30' when count(p_url) >=30 and count(p_url) <60 then 'pv30_60'  else 'pv60_plus' end) as pv from event_logs where en='e_pv' and p_url is not null and pl is not null and s_time >= unix_timestamp('$startDate','yyyy-MM-dd')*1000 and s_time < unix_timestamp('$endDate','yyyy-MM-dd')*1000 group by pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_sd ) as tmp insert overwrite table stats_view_depth_tmp select pl,day,pv,count(distinct u_sd) as ct where u_sd is not null group by pl,day,pv"

## insert into 
echo "start insert session data to hive table"
hive --database bigdater -e "with tmp as (select pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all select 'all' as pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select 'all' as pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select 'all' as pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' ) from tmp insert into table stats_view_depth select platform_convert(pl),date_convert(date),6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),'2015-12-13' group by pl,date"

## sqoop
echo "run the sqoop script,insert hive data to mysql table"
sqoop export --connect jdbc:mysql://hh:3306/report --username hive --password hive --table stats_view_depth --export-dir /hive/bigdater.db/stats_view_depth/* --input-fields-terminated-by "\\01" --update-mode allowinsert --update-key platform_dimension_id,data_dimension_id,kpi_dimension_id
echo "complete run the view depth job"

项目优化

一、调优的目的

充分的利用机器的性能,更快的完成mr程序的计算任务。甚至是在有限的机器条件下,能够支持运行足够多的mr程序。

二、调优的总体概述

从mr程序的内部运行机制,我们可以了解到一个mr程序由mapper和reducer两个阶段组成,其中mapper阶段包括数据的读取、map处理以及写出操作(排序和合并/sort&merge),而reducer阶段包含mapper输出数据的获取、数据合并(sort&merge)、reduce处理以及写出操作。那么在这七个子阶段中,能够进行较大力度的进行调优的就是map输出、reducer数据合并以及reducer个数这三个方面的调优操作。也就是说虽然性能调优包括cpu、内存、磁盘io以及网络这四个大方面,但是从mr程序的执行流程中,我们可以知道主要有调优的是内存、磁盘io以及网络。在mr程序中调优,主要考虑的就是减少网络传输和减少磁盘IO操作,故本次课程的mr调优主要包括服务器调优、代码调优、mapper调优、reducer调优以及runner调优这五个方面。

三、服务器调优

服务器调优主要包括服务器参数调优和jvm调优。在本次项目中,由于我们使用hbase作为我们分析数据的原始数据存储表,所以对于hbase我们也需要进行一些调优操作。除了参数调优之外,和其他一般的java程序一样,还需要进行一些jvm调优。

hdfs 调优

1. dfs.datanode.failed.volumes.tolerated: 允许发生磁盘错误的磁盘数量,默认为0,表示不允许datanode发生磁盘异常。当挂载多个磁盘的时候,可以修改该值。
2. dfs.replication: 复制因子,默认3
3. dfs.namenode.handler.count: namenode节点并发线程量,默认10
4. dfs.datanode.handler.count:datanode之间的并发线程量,默认10。
5. dfs.datanode.max.transfer.threads:datanode提供的数据流操作的并发线程量,默认4096。
	一般将其设置为linux系统的文件句柄数的85%~90%之间,查看文件句柄数语句ulimit -a,修改vim /etc/security/limits.conf, 不能设置太大
	文件末尾,添加
		* soft nofile 65535
		* hard nofile 65535
		注意:句柄数不能够太大,可以设置为1000000以下的所有数值,一般不设置为-1。
		异常处理:当设置句柄数较大的时候,重新登录可能出现unable load session的提示信息,这个时候采用单用户模式进行修改操作即可。
			单用户模式:
				启动的时候按'a'键,进入选择界面,然后按'e'键进入kernel修改界面,然后选择第二行'kernel...',按'e'键进行修改,在最后添加空格+single即可,按回车键回到修改界面,最后按'b'键进行单用户模式启动,当启动成功后,还原文件后保存,最后退出(exit)重启系统即可。
6. io.file.buffer.size: 读取/写出数据的buffer大小,默认4096,一般不用设置,推荐设置为4096的整数倍(物理页面的整数倍大小)。

hbase 调优

1. 设置regionserver的内存大小,默认为1g,推荐设置为4g。
	修改conf/hbase-env.sh中的HBASE_HEAPSIZE=4g
2. hbase.regionserver.handler.count: 修改客户端并发线程数,默认为10。设置规则为,当put和scans操作比较的多的时候,将其设置为比较小的值;当get和delete操作比较多的时候,将其设置为比较大的值。原因是防止频繁GC操作导致内存异常。
3. 自定义hbase的分割和紧缩操作,默认情况下hbase的分割机制是当region大小达到hbase.hregion.max.filesize(10g)的时候进行自动分割,推荐每个regionserver的region个数在20~500个为最佳。hbase的紧缩机制是hbase的一个非常重要的管理机制,hbase的紧缩操作是非常消耗内存和cpu的,所以一般机器压力比较大的话,推荐将其关闭,改为手动控制。
4. hbase.balancer.period: 设置hbase的负载均衡时间,默认为300000(5分钟),在负载比较高的集群上,将其值可以适当的改大。
5. hfile.block.cache.size:修改hflie文件块在内存的占比,默认0.4。在读应用比较多的系统中,可以适当的增大该值,在写应用比较多的系统中,可以适当的减少该值,不过不推荐修改为0。
6. hbase.regionserver.global.memstore.upperLimit:修改memstore的内存占用比率上限,默认0.4,当达到该值的时候,会进行flush操作将内容写的磁盘中。
7. hbase.regionserver.global.memstore.lowerLimit: 修改memstore的内存占用比率下限,默认0.38,进行flush操作后,memstore占用的内存比率必须不大于该值。
8. hbase.hregion.memstore.flush.size: 当memstore的值大于该值的时候,进行flush操作。默认134217728(128M)。
9. hbase.hregion.memstore.block.multiplier: 修改memstore阻塞块大小比率值,默认为4。也就是说在memstore的大小超过4*hbase.hregion.memstore.flush.size的时候就会触发写阻塞操作。最终可能会导致出现oom异常。

mapreduce 调优

1. mapreduce.task.io.sort.factor: mr程序进行合并排序的时候,打开的文件数量,默认为10个.
2. mapreduce.task.io.sort.mb: mr程序进行合并排序操作的时候或者mapper写数据的时候,内存大小,默认100M
3. mapreduce.map.sort.spill.percent: mr程序进行flush操作的阀值,默认0.80。
4. mapreduce.reduce.shuffle.parallelcopies:mr程序reducer copy数据的线程数,默认5。
5. mapreduce.reduce.shuffle.input.buffer.percent: reduce复制map数据的时候指定的内存堆大小百分比,默认为0.70,适当的增加该值可以减少map数据的磁盘溢出,能够提高系统性能。
6. mapreduce.reduce.shuffle.merge.percent:reduce进行shuffle的时候,用于启动合并输出和磁盘溢写的过程的阀值,默认为0.66。如果允许,适当增大其比例能够减少磁盘溢写次数,提高系统性能。同mapreduce.reduce.shuffle.input.buffer.percent一起使用。
7. mapreduce.task.timeout:mr程序的task执行情况汇报过期时间,默认600000(10分钟),设置为0表示不进行该值的判断。

四、代码调优

代码调优,主要是mapper和reducer中,针对多次创建的对象,进行代码提出操作。这个和一般的java程序的代码调优一样。

五、mapper 调优

mapper调优主要就是就一个目标:减少输出量。我们可以通过增加combine阶段以及对输出进行压缩设置进行mapper调优。
combine介绍:
	实现自定义combine要求继承reducer类,特点:
	以map的输出key/value键值对作为输入输出键值对,作用是减少网络输出,在map节点上就合并一部分数据。
	比较适合,map的输出是数值型的,方便进行统计。
压缩设置:
	在提交job的时候分别设置启动压缩和指定压缩方式。

六、reducer 调优

reducer调优主要是通过参数调优和设置reducer的个数来完成。
reducer个数调优:
	要求:一个reducer和多个reducer的执行结果一致,不能因为多个reducer导致执行结果异常。
	规则:一般要求在hadoop集群中的执行mr程序,map执行完成100%后,尽量早的看到reducer执行到33%,可以通过命令hadoop job -status job_id或者web页面来查看。
		原因: map的执行process数是通过inputformat返回recordread来定义的;而reducer是有三部分构成的,分别为读取mapper输出数据、合并所有输出数据以及reduce处理,其中第一步要依赖map的执行,所以在数据量比较大的情况下,一个reducer无法满足性能要求的情况下,我们可以通过调高reducer的个数来解决该问题。
	优点:充分利用集群的优势。
	缺点:有些mr程序没法利用多reducer的优点,比如获取top n的mr程序。

七、runner 调优

runner调优其实就是在提交job的时候设置job参数,一般都可以通过代码和xml文件两种方式进行设置。
1~8详见ActiveUserRunner(before和configure方法),9详解TransformerBaseRunner(initScans方法)

1. mapred.child.java.opts: 修改childyard进程执行的jvm参数,针对map和reducer均有效,默认:-Xmx200m 
2. mapreduce.map.java.opts: 需改map阶段的childyard进程执行jvm参数,默认为空,当为空的时候,使用mapred.child.java.opts。
3. mapreduce.reduce.java.opts:修改reducer阶段的childyard进程执行jvm参数,默认为空,当为空的时候,使用mapred.child.java.opts。
4. mapreduce.job.reduces: 修改reducer的个数,默认为1。可以通过job.setNumReduceTasks方法来进行更改。
5. mapreduce.map.speculative:是否启动map阶段的推测执行,默认为true。其实一般情况设置为false比较好。可通过方法job.setMapSpeculativeExecution来设置。
6. mapreduce.reduce.speculative:是否需要启动reduce阶段的推测执行,默认为true,其实一般情况设置为fase比较好。可通过方法job.setReduceSpeculativeExecution来设置。
7. mapreduce.map.output.compress:设置是否启动map输出的压缩机制,默认为false。在需要减少网络传输的时候,可以设置为true。
8. mapreduce.map.output.compress.codec:设置map输出压缩机制,默认为org.apache.hadoop.io.compress.DefaultCodec,推荐使用SnappyCodec(在之前版本中需要进行安装操作,现在版本不太清楚,安装参数:http://www.cnblogs.com/chengxin1982/p/3862309.html)
9. hbase参数设置
	由于hbase默认是一条一条数据拿取的,在mapper节点上执行的时候是每处理一条数据后就从hbase中获取下一条数据,通过设置cache值可以一次获取多条数据,减少网络数据传输。

架构设计与项目总结2021-02-11 大数据课程笔记 day22

项目流程
模块细节
	模块的计算方式
	问题
	如何解决的
总结:
	写离线项目简历
上一篇:IfcPropertySingleValue


下一篇:PLSQL Developer软件使用大全