第3天 离线项目-3 新增用户数据处理
时间维度
浏览器维度
平台维度
KPI 一个工具维度
通过以上四个维度的各种组合,计算它的新增用户指标
课程大纲
项目模块设计思路
新增用户指标 mapper 开发
新增用户指标 reducer 开发
新增用户指标 Runner 开发
MapReduce 结果存 MySQL
新增用户指标运行结果
hbase
uuid,servertime,browser,platform,kpi
事件 lanuch
时间
浏览器
平台
kpi 模块
时间 r
时间,浏览器 r
时间,平台 r
时间,浏览器,平台 r
时间 分组
时间,浏览器 分组
手机
品牌 两个品牌
型号 三个型号
问题:
1、一共库存多少手机
2、某个品牌库存多少手机
3、某个品牌某个型号库存多少手机
phone1 brand1 type1
phone2 brand2 type2
phone3 brand2 type3
phone4 brand1 type2
phone1 brand1 type1
phone1 brand1 type1
phone1 brand2 type1
phone2 brand1 type3
phone1 brand1 type1
phone1
phone1 type1
phone1 brand1 type1
brand1 phone1
type1 phone1
brand1 type1 phone1
phone1
维度组合的类图:
项目模块设计思路
map
数据裂变
纬度组合
reduce
汇聚统计
1、 从 hbase 读取数据
2、 纬度的组合
3、 reducer 聚合
4、 数据放 MySQL
a) TableMapReduceUtil.initTableReducerJob();
b) 自己实现向 MySQL 存数据的OutputFormat
新增用户指标 mapper 开发
TransformerOutputFormat.class
自定义的用于向MySQL插入数据的类
四个纬度:时间、浏览器、平台、模块
需要对LAUNCH_EVENT
数据过滤
组合四个纬度,向输出外键值对信息。
维度组合有多少种?
各个维度的种类相乘得到结果
新增用户指标 reducer 开发
由于统计的是用户的数量,需要对 log 进行 uuid 的过滤,因为同一个人有可能点击了多次。
新增用户指标 Runner 开发
1、scan 添加过滤器 , startKey stopKey
2、指定 en=e_l 的事件条件
3、指定要获取的列名MultipleColumnPrefixFilter
4、指定表名scan.setAttribute(Scan.SCAN_ATTRIUBTES_TABLE_NAME, EventLogContants.HBASE_NAME_EVENT_LOGS.getBytes());
TestDataMaker 用于生成 hbase 的数据。
service.impl 修改连接 MySQL 的字符串transformer-env.xml
中修改连接 MySQL 的字符串com.sxt.transformer.service.impl.DimensionConverterImpl
修改连接数据库字符串
MapReduce 结果存 MySQL
TransformerOutputFormat
类中的 RecordWriter 用于向 MySQL 输出
新增用户指标运行结果
活跃用户数据统计
活跃用户数据统计
关于架构:
nginx 文件需要滚动,让 flume 监控某个目录,预防 nginx 端日志文件过大。
ETL,公司封装好的。
1、活跃用户需要统计的是 PV 事件
2、需要的维度:
时间、平台、模块、浏览器
3、使用 StatsUserDimension
作为 key,使用 TimeOutputValue
作为输出
如果有可能,需要自定义单独的维度类,以及使用封装类封装维度类信息。
4、需要指定output-collector.xml
和query-mapping.xml
中的配置
其中output-collector.xml
是自己实现的如何给PreparedStatement
赋值以及添加addBatch()
。query-mapping.xml
中用于配置 sql 语句的,包括如何查询或插入父表数据,如何获取主表 ID,以及如何向 MySQL 插入数据的 SQL 语句。
项目中如何新增模块?
1、 开发 Runner、Mapper 和 Reducer
2、 添加额外的单一维度对象
3、 添加纬度对象组合
4、 添加对应的 collector 类
5、 修改配置文件
a) query-mapping.xml
添加对应的 SQL 语句
b) output-collector.xml
添加对应的反射类
第五天 大数据网站日志离线分析项目
1 hive 和 hbase 的整合
https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
注意事项:
版本信息
Avro Data Stored in HBase Columns
As of Hive 0.9.0 the HBase integration requires at least HBase 0.92, earlier versions of Hive were working with HBase 0.89/0.90
Hive 0.9.0与HBase 0.92兼容。
版本信息
Hive 1.x will remain compatible with HBase 0.98.x and lower versions. Hive 2.x will be compatible with HBase 1.x and higher. (See HIVE-10990 for details.) Consumers wanting to work with HBase 1.x using Hive 1.x will need to compile Hive 1.x stream code themselves.
Hive 1.x仍然和HBase 0.98.x兼容。
HIVE-705 提出的原生支持的 Hive 和 HBase 的整合。可以使用 Hive QL 语句访问 HBase 的表,包括 SELECT 和 INSERT。甚至让 hive 做 Hive 表和 HBase 表的 join 操作和 union 操作。
需要 jar 包(hive 自带)
hive-hbase-handler-x.y.z.jar
在 hive 的服务端: hive 远程模式:服务端 + 客户端 metastore hive
然后正常启动:hive --service metastore
启动客户端 CLI:hive
要在 hive 中操作 hbase 的表,需要对列进行映射。
CREATE external TABLE hbase_table_1(key string, value string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "xyz",
"hbase.mapred.output.outputtable" = "xyz");
必须指定hbase.columns.mapping
属性。hbase.table.name
属性可选,用于指定 hbase 中对应的表名,允许在 hive 表中使用不同的表名。上例中,hive 中表名为 hbase_table_1,hbase 中表名为 xyz。如果不指定,hive 中的表名与 hbase 中的表名一致。hbase.mapred.output.outputtable
属性可选,向表中插入数据的时候是必须的。该属性的值传递给了hbase.mapreduce.TableOutputFormat
使用。
在 hive 表定义中的映射hbase.columns.mapping
中的cf1:val
在创建完表之后,hbase 中只显示 cf1,并不显示 val,因为 val 是行级别的,cf1 才是 hbase 中表级别的元数据。
具体操作:
hive:
CREATE TABLE hbase_table_1(key int, value string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "xyz", "hbase.mapred.output.outputtable" = "xyz");
hbase:
list
desc 'xyz'
hive 操作:
insert into hbase_table_1 values(1,'zhangsan');
hbase 操作:
scan 'xyz'
建立外部表要求 hbase 中必须有表对应。
hbase 操作:
create 'tb_user', 'info'
hive 操作:
create external table hive_tb_user1 (
key int,
name string,
age int,
sex string,
likes array<string>
)
row format
delimited
collection items terminated by '-'
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping"=":key,info:name,info:age,info:sex,info:likes")
tblproperties("hbase.table.name"="tb_user", "hbase.mapred.output.outputtable"="tb_user");
from tb_test
insert into table hive_tb_user
select 1,'zhangsan',25,'female',array('climbing','reading','shopping') limit 1;
hbase 操作:
scan 'tb_user'
put 'tb_user', 1, 'info:likes', 'like1-like2-like3-like4'
hive 和 hbase
要求在 hive 的 server 端中添加配置信息:hive-site.xml
中添加
<property>
<name>hbase.zookeeper.quorum</name>
<value>node2,node3,node4</value>
</property>
hive --service metastore
客户端直接启动 hive 就行了hive
1、 创建 hive 的内部表,要求 hbase 中不能有对应的表
2、 创建 hive 的外部表,要求 hbase 中一定要有对应的表
3、 映射关系通过
a) WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:id,cf:username,cf:age")
4、 stored by 指定 hive 中存储数据的时候,由该类来处理,该类会将数据放到hbase 的存储中,同时在 hive 读取数据的时候,由该类负责处理 hbase 的数据和 hive 的对应关系
a) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
5、指定 hive 表和 hbase 中的哪张表对应,outputtable 负责当 hive insert 数据的时候将数据写到 hbase 的哪张表。
TBLPROPERTIES ("hbase.table.name" = "my_table", "hbase.mapred.output.outputtable" = "my_table");
6、如果 hbase 中的表名和 hive 中的表名一致,则可以不指定 tblproperties。
2 sqoop 介绍+安装+数据导入
Sqoop: 将关系数据库(oracle、mysql、postgresql 等)数据与 hadoop 数据进行转换的工具
官网:http://sqoop.apache.org/
版本:(两个版本完全不兼容,sqoop1 使用最多)
sqoop1:1.4.x
sqoop2:1.99.x
同类产品
DataX:阿里*数据交换工具
sqoop 架构非常简单,是 hadoop 生态系统的架构最简单的框架。
sqoop1 由 client 端直接接入 hadoop,任务通过解析生成对应的 maprecue 执行
MR 中通过 InputFormat 和 OutputFormat 配置 MR 的输入和输出
2.1 sqoop 导入:
2.2 sqoop 导出
2.3 sqoop 安装和测试
解压
配置环境变量
SQOOP_HOME
PATH
添加数据库驱动包
配置sqoop-env.sh
不需要修改
注释掉bin/configure-sqoop
中的第134-147
行以关闭不必要的警告信息。
测试
sqoop version
sqoop list-databases --connect jdbc:mysql://node4:3306/ --username root --password 123456
sqoop help
sqoop help command
直接在命令行执行:
sqoop list-databases --connect jdbc:mysql://node1:3306 --username hive --password hive123
将 sqoop 的命令放到文件中:
sqoop1.txt
######################
list-databases
--connect
jdbc:mysql://node4:3306
--username
hive
--password
hive123
######################
命令行执行:
sqoop --options-file sqoop1.txt
[root@node4 sqoop-1.4.6]# sqoop help list-databases
usage: sqoop list-databases [GENERIC-ARGS] [TOOL-ARGS]
Common arguments:
--connect <jdbc-uri> Specify JDBC connect
string
--connection-manager <class-name> Specify connection manager
class name
--connection-param-file <properties-file> Specify connection
parameters file
--driver <class-name> Manually specify JDBC
driver class to use
--hadoop-home <hdir> Override
$HADOOP_MAPRED_HOME_ARG
--hadoop-mapred-home <dir> Override
$HADOOP_MAPRED_HOME_ARG
--help Print usage instructions
-P Read password from console
--password <password> Set authentication
password
--password-alias <password-alias> Credential provider
password alias
--password-file <password-file> Set authentication
password file path
--relaxed-isolation Use read-uncommitted
isolation for imports
--skip-dist-cache Skip copying jars to
distributed cache
--username <username> Set authentication
username
--verbose Print more information
while working
Generic Hadoop command-line arguments:
(must preceed any tool-specific arguments)
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|resourcemanager:port> specify a ResourceManager
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
从hive导出到MySQL,则需要在hive的主机(比如hive的客户端所在的位置)安装sqoop。
$CONDITIONS
[root@server3 ~]# sqoop help import
usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]
Common arguments:
--connect <jdbc-uri> Specify JDBC connect
string
--connection-manager <class-name> Specify connection manager
class name
--connection-param-file <properties-file> Specify connection
parameters file
--driver <class-name> Manually specify JDBC
driver class to use
--hadoop-home <hdir> Override
$HADOOP_MAPRED_HOME_ARG
--hadoop-mapred-home <dir> Override
$HADOOP_MAPRED_HOME_ARG
--help Print usage instructions
-P Read password from console
--password <password> Set authentication
password
--password-alias <password-alias> Credential provider
password alias
--password-file <password-file> Set authentication
password file path
--relaxed-isolation Use read-uncommitted
isolation for imports
--skip-dist-cache Skip copying jars to
distributed cache
--username <username> Set authentication
username
--verbose Print more information
while working
Import control arguments:
--append Imports data
in append
mode
--as-avrodatafile Imports data
to Avro data
files
--as-parquetfile Imports data
to Parquet
files
--as-sequencefile Imports data
to
SequenceFile
s
--as-textfile Imports data
as plain
text
(default)
--autoreset-to-one-mapper Reset the
number of
mappers to
one mapper
if no split
key
available
--boundary-query <statement> Set boundary
query for
retrieving
max and min
value of the
primary key
--columns <col,col,col...> 指定将数据库表中的哪些列数据导入
--compression-codec <codec> Compression
codec to use
for import
--delete-target-dir Imports data in delete mode
--direct Use direct
import fast
path
--direct-split-size <n> Split the
input stream
every 'n'
bytes when
importing in
direct mode
-e,--query <statement> Import
results of
SQL
'statement'
--fetch-size <n> Set number
'n' of rows
to fetch
from the
database
when more
rows are
needed
--inline-lob-limit <n> Set the
maximum size
for an
inline LOB
-m,--num-mappers <n> Use 'n' map
tasks to
import in
parallel
--mapreduce-job-name <name> Set name for
generated
mapreduce
job
--merge-key <column> Key column
to use to
join results
--split-by <column-name> Column of
the table
used to
split work
units
--table <table-name> Table to
read
--target-dir <dir> HDFS plain
table
destination
--validate Validate the
copy using
the
configured
validator
--validation-failurehandler <validation-failurehandler> Fully
qualified
class name
for
ValidationFa
ilureHandler
--validation-threshold <validation-threshold> Fully
qualified
class name
for
ValidationTh
reshold
--validator <validator> Fully
qualified
class name
for the
Validator
--warehouse-dir <dir> HDFS parent
for table
destination
--where <where clause> WHERE clause
to use
during
import
-z,--compress Enable
compression
Incremental import arguments:
--check-column <column> Source column to check for incremental
change
--incremental <import-type> Define an incremental import of type
'append' or 'lastmodified'
--last-value <value> Last imported value in the incremental
check column
Output line formatting arguments:
--enclosed-by <char> Sets a required field enclosing
character
--escaped-by <char> Sets the escape character
--fields-terminated-by <char> Sets the field separator character
--lines-terminated-by <char> Sets the end-of-line character
--mysql-delimiters Uses MySQL's default delimiter set:
fields: , lines: \n escaped-by: \
optionally-enclosed-by: '
--optionally-enclosed-by <char> Sets a field enclosing character
Input parsing arguments:
--input-enclosed-by <char> Sets a required field encloser
--input-escaped-by <char> Sets the input escape
character
--input-fields-terminated-by <char> Sets the input field separator
--input-lines-terminated-by <char> Sets the input end-of-line
char
--input-optionally-enclosed-by <char> Sets a field enclosing
character
Code generation arguments:
--bindir <dir> Output directory for compiled
objects
--class-name <name> Sets the generated class name.
This overrides --package-name.
When combined with --jar-file,
sets the input class.
--input-null-non-string <null-str> Input null non-string
representation
--input-null-string <null-str> Input null string representation
--jar-file <file> Disable code generation; use
specified jar
--map-column-java <arg> Override mapping for specific
columns to java types
--null-non-string <null-str> Null non-string representation
--null-string <null-str> Null string representation
--outdir <dir> Output directory for generated
code
--package-name <name> Put auto-generated classes in
this package
Generic Hadoop command-line arguments:
(must preceed any tool-specific arguments)
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|resourcemanager:port> specify a ResourceManager
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
At minimum, you must specify --connect and --table
Arguments to mysqldump and other subprograms may be supplied
after a '--' on the command line.
命令行导入:
从 MySQL 导数据到 HDFS ,导入
sqoop import --connect jdbc:mysql://node4/log_results --username hivehive --password hive --as-textfile --table dimension_browser --columns id,browser_name,browser_version --target-dir /sqoop/test1 --delete-target-dir -m 1
将语句写入文件并运行:
sqoop2.txt
import
--connect
jdbc:mysql://node4/log_results
--username
hivehive
--password
hive
--as-textfile
--table
dimension_browser
--columns
id,browser_name,browser_version
--target-dir
/sqoop/test1
--delete-target-dir
-m
1
命令行:
sqoop --options-file sqoop2.txt
可以指定 SQL 执行导入:
sqoop3.txt
import
--connect
jdbc:mysql://node4/log_results
--username
hivehive
--password
hive
--as-textfile
#--query is the same as -e
-e
select id, browser_name, browser_version from dimension_browser where $CONDITIONS
--target-dir
/sqoop/test2
--delete-target-dir
-m
1
命令行:
sqoop --options-file sqoop3.txt
指定导出文件的分隔符:
sqoop4.txt
import
--connect
jdbc:mysql://node1/log_results
--username
hive
--password
hive123
--as-textfile
-e
select id, browser_name, browser_version from dimension_browser where $CONDITIONS
--target-dir
/sqoop/test2-1
--delete-target-dir
-m
1
--fields-terminated-by
\t
命令行:
sqoop --options-file sqoop4.txt
导入到HDFS以及在HIVE创建表 默认字段的分隔符就是逗号,可以不指定逗号
sqoop5.txt
import
--connect
jdbc:mysql://node1/log_results
--username
hivehive
--password
hive
--as-textfile
#--query is the same as -e
-e
select id, browser_name, browser_version from dimension_browser where $CONDITIONS
--hive-import
--create-hive-table
--hive-table
hive_browser_dim
--target-dir
/my/tmp
-m
1
--fields-terminated-by
,
命令行:
sqoop --options-file sqoop5.txt
导出:
hdfs://mycluster/sqoop/data/mydata.txt
1,zhangsan,hello world
2,lisi,are you ok
3,wangwu,fine thanks
4,zhaoliu,what are you doing
5,qunqi,just say hello
sqoop6.txt
export
--connect
jdbc:mysql://node1/log_results
--username
hivehive
--password
hive
--columns
id,myname,myversion
--export-dir
/user/hive/warehouse/hive_browser_dim/
-m
1
--table
mybrowserinfo
--input-fields-terminated-by
,
sqoop6-1.txt
由于文件默认使用,分割各个字段,sqoop 导出的时候默认也是使用,分割
export
--connect
jdbc:mysql://node1/log_results
--username
hivehive
--password
hive
--columns
id,myname,myversion
--export-dir
/user/hive/warehouse/hive_browser_dim/
-m
1
--table
mybrowserinfo1
命令行:
sqoop --options-file sqoop6-1.txt
令行执行:
sqoop --options-file sqoop6.txt
逗号不需要指定分隔符
默认的 hive 分隔符需要在 sqoop 文件中指定分隔符 \001:
sqoop11.txt
export
--connect
jdbc:mysql://node1/log_results
--username
hive
--password
hive123
--columns
id,name,msg
--export-dir
/user/hive/warehouse/tb_log2
-m
1
--table
tb_loglog
--input-fields-terminated-by
\001
用户浏览深度SQL分析
四种行转列:
join
union
DECODE(oracle)
case when
需求:
将用户访问的次数进行分组,每组多少人。
站在统计用户的角度
MySQL 中的stat_view_depth
表
1. 在 hive 中创建 hbase 的 eventlog 对应表
CREATE EXTERNAL TABLE event_logs(
key string, pl string, en string, s_time bigint, p_url string, u_ud string, u_sd string
) ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':key,log:pl,log:en,log:s_time,log:p_url,log:u_ud,log:u_sd')
tblproperties('hbase.table.name'='eventlog');
2. 创建 mysql 在 hive 中的对应表
hive 中的表,执行 HQL 之后分析的结果保存该表,然后通过 sqoop 工具导出到 mysql
CREATE TABLE `stats_view_depth` (
`platform_dimension_id` bigint ,
`data_dimension_id` bigint ,
`kpi_dimension_id` bigint ,
`pv1` bigint ,
`pv2` bigint ,
`pv3` bigint ,
`pv4` bigint ,
`pv5_10` bigint ,
`pv10_30` bigint ,
`pv30_60` bigint ,
`pv60_plus` bigint ,
`created` string
) row format delimited fields terminated by '\t';
3. hive创建临时表
把 hql 分析之后的中间结果存放到当前的临时表。
CREATE TABLE `stats_view_depth_tmp`(`pl` string, `date` string, `col` string, `ct` bigint);
pl 平台
date 日期
co l列,值对应于 mysql 表中的列:pv1,pv2,pv4….
ct 对应于每列的值
col 对应 mysql 中的 pv 前缀列。
4. 编写 UDF
(platformdimension & datedimension) 两个维度
package com.sxt.transformer.hive;
import com.sxt.common.DateEnum;
import com.sxt.transformer.model.dim.base.DateDimension;
import com.sxt.transformer.model.dim.base.PlatformDimension;
import com.sxt.transformer.service.IDimensionConverter;
import com.sxt.transformer.service.impl.DimensionConverterImpl;
import com.sxt.util.TimeUtil;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* 操作日期dimension 相关的udf
*
* @author root
*
*/
public class PlatformDimensionUDF extends UDF {
private IDimensionConverter converter = new DimensionConverterImpl();
/**
* 根据给定的platform名称返回id
*
* @param platform
* @return
*/
public IntWritable evaluate(Text platform) {
PlatformDimension dimension = new PlatformDimension(platform.toString());
try {
int id = this.converter.getDimensionIdByValue(dimension);
return new IntWritable(id);
} catch (IOException e) {
throw new RuntimeException("获取id异常");
}
}
}
package com.sxt.transformer.hive;
import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import com.sxt.common.DateEnum;
import com.sxt.transformer.model.dim.base.DateDimension;
import com.sxt.transformer.service.IDimensionConverter;
import com.sxt.transformer.service.impl.DimensionConverterImpl;
import com.sxt.util.TimeUtil;
/**
* 操作日期dimension 相关的udf
*
* @author root
*
*/
public class DateDimensionUDF extends UDF {
private IDimensionConverter converter = new DimensionConverterImpl();
/**
* 根据给定的日期(格式为:yyyy-MM-dd)至返回id
*
* @param day
* @return
*/
public IntWritable evaluate(Text day) {
DateDimension dimension = DateDimension.buildDate(TimeUtil.parseString2Long(day.toString()), DateEnum.DAY);
try {
int id = this.converter.getDimensionIdByValue(dimension);
System.out.println(day.toString());
System.out.println(id);
return new IntWritable(id);
} catch (IOException e) {
throw new RuntimeException("获取id异常" + day.toString());
}
}
}
5. 上传
打包hivejar.jar
上传到 hdfs 的 /sxt/transformer
文件夹中
6. 创建 hive 的f unction
#create function platformFunc as 'com.sxt.transformer.hive.PlatformDimensionUDF' using jar 'hdfs://mycluster/sxt/transformer/hivejar.jar';
create function dateFunc as 'com.sxt.transformer.hive.DateDimensionUDF' using jar 'hdfs://mycluster/sxt/transformer/hivejar.jar';
7. hql编写(统计用户角度的浏览深度)<注意:时间为外部给定>
from (
select
pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_ud,
(case when count(p_url) = 1 then "pv1"
when count(p_url) = 2 then "pv2"
when count(p_url) = 3 then "pv3"
when count(p_url) = 4 then "pv4"
when count(p_url) >= 5 and count(p_url) <10 then "pv5_10"
when count(p_url) >= 10 and count(p_url) <30 then "pv10_30"
when count(p_url) >=30 and count(p_url) <60 then "pv30_60"
else 'pv60_plus' end) as pv
from event_logs
where
en='e_pv'
and p_url is not null
and pl is not null
and s_time >= unix_timestamp('2017-08-23','yyyy-MM-dd')*1000
and s_time < unix_timestamp('2017-08-24','yyyy-MM-dd')*1000
group by
pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_ud
) as tmp
insert overwrite table stats_view_depth_tmp
select pl,day,pv,count(distinct u_ud) as ct where u_ud is not null group by pl,day,pv;
如何知道该访客是 pv10 的?
聚合操作
需要从hbase
表中查询数据,对u_ud
聚合,计算出多少个pv
事件case when
得出该访客属于pv10
89155407 pv3
62439313 pv5_10
41469129 pv10_30
37005838 pv30_60
08257218 pv3
总的得出所有人属于 pv10
对所有的 pv10 聚合,计算 u_ud 的总数,得出 pv10 的有多少人
`pl` string, `date` string, `col` string, `ct` bigint
website 2019-11-18 pv10 300
website 2019-11-18 pv10 400
website 2019-11-18 pv10 500
website 2019-11-18 pv10 300
website 2019-11-18 pv5_10 20
website 2019-11-18 pv10_30 40
website 2019-11-18 pv30_60 10
website 2019-11-18 pv60_plus 120
总的得出所有 pv?有都少人
pv1 人数是多少?
聚合操作
行转列 -> 结果
– 把临时表的多行数据,转换一行
行转列
std | prj | score |
---|---|---|
S1 | M | 100 |
S1 | E | 98 |
S1 | Z | 80 |
S2 | M | 87 |
S2 | E | 88 |
S2 | Z | 89 |
std | M | E | Z |
---|---|---|---|
S1 | 100 | 98 | 80 |
S2 | 87 | 88 | 89 |
select std, score from my_score where prj='M';
select std, score from my_score where prj='E';
select std, score from my_score where prj='Z';
select std, t1.score, t2.score, t3.score from t1 join t2 on t1.std=t2.std
join t3 on t1.std=t3.std;
SELECT t1.std, t1.score, t2.score, t3.score
from
(select std, score from my_score where prj='M') t1
join
(select std, score from my_score where prj='E') t2
on t1.std=t2.std
join (select std, score from my_score where prj='Z') t3
on t1.std=t3.std;
采用 union all 的方式:
select tmp.std, sum(tmp.M), sum(tmp.E), sum(tmp.Z) from (
select std, score as 'M', 0 as 'E', 0 as 'Z' from tb_score where prj='M' UNION ALL
select std, 0 as 'M', score as 'E', 0 as 'Z' from tb_score where prj='E' UNION ALL
select std, 0 as 'M', 0 as 'E', score as 'Z' from tb_score where prj='Z'
) tmp group by tmp.std;
with tmp as
(
select pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all
select 'all' as pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select 'all' as pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus'
)
from tmp
insert overwrite table stats_view_depth
select 2,3,6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),'2017-01-10' group by pl,date1;
编写 UDF 获取 2,3,6 的值,2,3,6 是一个假的数据。
with tmp as
(
select pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all
select 'all' as pl,`date` as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select 'all' as pl,`date` as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select 'all' as pl,`date` as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus'
)
from tmp
insert overwrite table stats_view_depth
select platformFunc(pl),dateFunc(date1), 6 ,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),date1 group by pl,date1;
8. sqoop 脚本编写(统计用户角度)
sqoop --options-file sqoop_1
文件sqoop_1内容:
export
-connect
jdbc:mysql://node3:3306/log_results
--username
loguser
--password
log123
--table
stats_view_depth
--export-dir
/user/hive/warehouse/stats_view_depth/*
--input-fields-terminated-by
"\t"
--update-mode
allowinsert
--update-key
platform_dimension_id,data_dimension_id,kpi_dimension_id
站在会话角度的浏览深度
9. hql 编写(统计会话角度的浏览深度)<注意:时间为外部给定>
from (
select pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_sd,
(case when count(p_url) = 1 then "pv1"
when count(p_url) = 2 then "pv2"
when count(p_url) = 3 then "pv3"
when count(p_url) = 4 then "pv4"
when count(p_url) >= 5 and count(p_url) <10 then "pv5_10"
when count(p_url) >= 10 and count(p_url) <30 then "pv10_30"
when count(p_url) >=30 and count(p_url) <60 then "pv30_60"
else 'pv60_plus' end) as pv
from event_logs
where en='e_pv' and p_url is not null and pl is not null and s_time >= unix_timestamp('2016-06-10','yyyy-MM-dd')*1000 and s_time < unix_timestamp('2016-06-11','yyyy-MM-dd')*1000
group by pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_sd
) as tmp
insert overwrite table stats_view_depth_tmp
select pl,day,pv,count(distinct u_sd) as ct where u_sd is not null group by pl,day,pv;
with tmp as
(
select pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all
select 'all' as pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all
select 'all' as pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all
select 'all' as pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all
select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus'
)
from tmp
insert overwrite table stats_view_depth select 3,4,6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),'2016-06-10' group by pl,date;
10. sqoop 脚本编写(统计会话角度)
sqoop --options-file sqoop_2
文件sqoop_2内容
export
--connect
jdbc:mysql://hh:3306/report
--username
hive
--password
hive
--table
stats_view_depth
--export-dir
/hive/bigdater.db/stats_view_depth/*
--input-fields-terminated-by
"\\01"
--update-mode
allowinsert
--update-key
platform_dimension_id,data_dimension_id,kpi_dimension_id
shell 脚本编写
view_depth_run.sh
#!/bin/bash
startDate=''
endDate=''
until [ $# -eq 0 ]
do
if [ $1'x' = '-sdx' ]; then
shift
startDate=$1
elif [ $1'x' = '-edx' ]; then
shift
endDate=$1
fi
shift
done
if [ -n "$startDate" ] && [ -n "$endDate" ]; then
echo "use the arguments of the date"
else
echo "use the default date"
startDate=$(date -d last-day +%Y-%m-%d)
endDate=$(date +%Y-%m-%d)
fi
echo "run of arguments. start date is:$startDate, end date is:$endDate"
echo "start run of view depth job "
## insert overwrite
echo "start insert user data to hive tmp table"
hive -e "from (select pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_ud, (case when count(p_url) = 1 then 'pv1' when count(p_url) = 2 then 'pv2' when count(p_url) = 3 then 'pv3' when count(p_url) = 4 then 'pv4' when count(p_url) >= 5 and count(p_url) <10 then 'pv5_10' when count(p_url) >= 10 and count(p_url) <30 then 'pv10_30' when count(p_url) >=30 and count(p_url) <60 then 'pv30_60' else 'pv60_plus' end) as pv from event_logs where en='e_pv' and p_url is not null and pl is not null and s_time >= unix_timestamp('$startDate','yyyy-MM-dd')*1000 and s_time < unix_timestamp('$endDate','yyyy-MM-dd')*1000 group by pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_ud) as tmp insert overwrite table stats_view_depth_tmp select pl,day,pv,count(distinct u_ud) as ct where u_ud is not null group by pl,day,pv"
echo "start insert user data to hive table"
hive -e "with tmp as (select pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all select 'all' as pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select 'all' as pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select 'all' as pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' ) from tmp insert overwrite table stats_view_depth select platform_convert(pl),date_convert(date),5,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),date group by pl,date"
echo "start insert session date to hive tmp table"
hive -e "from (select pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as day, u_sd, (case when count(p_url) = 1 then 'pv1' when count(p_url) = 2 then 'pv2' when count(p_url) = 3 then 'pv3' when count(p_url) = 4 then 'pv4' when count(p_url) >= 5 and count(p_url) <10 then 'pv5_10' when count(p_url) >= 10 and count(p_url) <30 then 'pv10_30' when count(p_url) >=30 and count(p_url) <60 then 'pv30_60' else 'pv60_plus' end) as pv from event_logs where en='e_pv' and p_url is not null and pl is not null and s_time >= unix_timestamp('$startDate','yyyy-MM-dd')*1000 and s_time < unix_timestamp('$endDate','yyyy-MM-dd')*1000 group by pl, from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'), u_sd ) as tmp insert overwrite table stats_view_depth_tmp select pl,day,pv,count(distinct u_sd) as ct where u_sd is not null group by pl,day,pv"
## insert into
echo "start insert session data to hive table"
hive --database bigdater -e "with tmp as (select pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' union all select 'all' as pl,date,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all select 'all' as pl,date,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv2' union all select 'all' as pl,date,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv3' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv4' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv5_10' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv10_30' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv30_60' union all select 'all' as pl,date,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col='pv60_plus' ) from tmp insert into table stats_view_depth select platform_convert(pl),date_convert(date),6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),'2015-12-13' group by pl,date"
## sqoop
echo "run the sqoop script,insert hive data to mysql table"
sqoop export --connect jdbc:mysql://hh:3306/report --username hive --password hive --table stats_view_depth --export-dir /hive/bigdater.db/stats_view_depth/* --input-fields-terminated-by "\\01" --update-mode allowinsert --update-key platform_dimension_id,data_dimension_id,kpi_dimension_id
echo "complete run the view depth job"
项目优化
一、调优的目的
充分的利用机器的性能,更快的完成mr程序的计算任务。甚至是在有限的机器条件下,能够支持运行足够多的mr程序。
二、调优的总体概述
从mr程序的内部运行机制,我们可以了解到一个mr程序由mapper和reducer两个阶段组成,其中mapper阶段包括数据的读取、map处理以及写出操作(排序和合并/sort&merge),而reducer阶段包含mapper输出数据的获取、数据合并(sort&merge)、reduce处理以及写出操作。那么在这七个子阶段中,能够进行较大力度的进行调优的就是map输出、reducer数据合并以及reducer个数这三个方面的调优操作。也就是说虽然性能调优包括cpu、内存、磁盘io以及网络这四个大方面,但是从mr程序的执行流程中,我们可以知道主要有调优的是内存、磁盘io以及网络。在mr程序中调优,主要考虑的就是减少网络传输和减少磁盘IO操作,故本次课程的mr调优主要包括服务器调优、代码调优、mapper调优、reducer调优以及runner调优这五个方面。
三、服务器调优
服务器调优主要包括服务器参数调优和jvm调优。在本次项目中,由于我们使用hbase作为我们分析数据的原始数据存储表,所以对于hbase我们也需要进行一些调优操作。除了参数调优之外,和其他一般的java程序一样,还需要进行一些jvm调优。
hdfs 调优
1. dfs.datanode.failed.volumes.tolerated: 允许发生磁盘错误的磁盘数量,默认为0,表示不允许datanode发生磁盘异常。当挂载多个磁盘的时候,可以修改该值。
2. dfs.replication: 复制因子,默认3
3. dfs.namenode.handler.count: namenode节点并发线程量,默认10
4. dfs.datanode.handler.count:datanode之间的并发线程量,默认10。
5. dfs.datanode.max.transfer.threads:datanode提供的数据流操作的并发线程量,默认4096。
一般将其设置为linux系统的文件句柄数的85%~90%之间,查看文件句柄数语句ulimit -a,修改vim /etc/security/limits.conf, 不能设置太大
文件末尾,添加
* soft nofile 65535
* hard nofile 65535
注意:句柄数不能够太大,可以设置为1000000以下的所有数值,一般不设置为-1。
异常处理:当设置句柄数较大的时候,重新登录可能出现unable load session的提示信息,这个时候采用单用户模式进行修改操作即可。
单用户模式:
启动的时候按'a'键,进入选择界面,然后按'e'键进入kernel修改界面,然后选择第二行'kernel...',按'e'键进行修改,在最后添加空格+single即可,按回车键回到修改界面,最后按'b'键进行单用户模式启动,当启动成功后,还原文件后保存,最后退出(exit)重启系统即可。
6. io.file.buffer.size: 读取/写出数据的buffer大小,默认4096,一般不用设置,推荐设置为4096的整数倍(物理页面的整数倍大小)。
hbase 调优
1. 设置regionserver的内存大小,默认为1g,推荐设置为4g。
修改conf/hbase-env.sh中的HBASE_HEAPSIZE=4g
2. hbase.regionserver.handler.count: 修改客户端并发线程数,默认为10。设置规则为,当put和scans操作比较的多的时候,将其设置为比较小的值;当get和delete操作比较多的时候,将其设置为比较大的值。原因是防止频繁GC操作导致内存异常。
3. 自定义hbase的分割和紧缩操作,默认情况下hbase的分割机制是当region大小达到hbase.hregion.max.filesize(10g)的时候进行自动分割,推荐每个regionserver的region个数在20~500个为最佳。hbase的紧缩机制是hbase的一个非常重要的管理机制,hbase的紧缩操作是非常消耗内存和cpu的,所以一般机器压力比较大的话,推荐将其关闭,改为手动控制。
4. hbase.balancer.period: 设置hbase的负载均衡时间,默认为300000(5分钟),在负载比较高的集群上,将其值可以适当的改大。
5. hfile.block.cache.size:修改hflie文件块在内存的占比,默认0.4。在读应用比较多的系统中,可以适当的增大该值,在写应用比较多的系统中,可以适当的减少该值,不过不推荐修改为0。
6. hbase.regionserver.global.memstore.upperLimit:修改memstore的内存占用比率上限,默认0.4,当达到该值的时候,会进行flush操作将内容写的磁盘中。
7. hbase.regionserver.global.memstore.lowerLimit: 修改memstore的内存占用比率下限,默认0.38,进行flush操作后,memstore占用的内存比率必须不大于该值。
8. hbase.hregion.memstore.flush.size: 当memstore的值大于该值的时候,进行flush操作。默认134217728(128M)。
9. hbase.hregion.memstore.block.multiplier: 修改memstore阻塞块大小比率值,默认为4。也就是说在memstore的大小超过4*hbase.hregion.memstore.flush.size的时候就会触发写阻塞操作。最终可能会导致出现oom异常。
mapreduce 调优
1. mapreduce.task.io.sort.factor: mr程序进行合并排序的时候,打开的文件数量,默认为10个.
2. mapreduce.task.io.sort.mb: mr程序进行合并排序操作的时候或者mapper写数据的时候,内存大小,默认100M
3. mapreduce.map.sort.spill.percent: mr程序进行flush操作的阀值,默认0.80。
4. mapreduce.reduce.shuffle.parallelcopies:mr程序reducer copy数据的线程数,默认5。
5. mapreduce.reduce.shuffle.input.buffer.percent: reduce复制map数据的时候指定的内存堆大小百分比,默认为0.70,适当的增加该值可以减少map数据的磁盘溢出,能够提高系统性能。
6. mapreduce.reduce.shuffle.merge.percent:reduce进行shuffle的时候,用于启动合并输出和磁盘溢写的过程的阀值,默认为0.66。如果允许,适当增大其比例能够减少磁盘溢写次数,提高系统性能。同mapreduce.reduce.shuffle.input.buffer.percent一起使用。
7. mapreduce.task.timeout:mr程序的task执行情况汇报过期时间,默认600000(10分钟),设置为0表示不进行该值的判断。
四、代码调优
代码调优,主要是mapper和reducer中,针对多次创建的对象,进行代码提出操作。这个和一般的java程序的代码调优一样。
五、mapper 调优
mapper调优主要就是就一个目标:减少输出量。我们可以通过增加combine阶段以及对输出进行压缩设置进行mapper调优。
combine介绍:
实现自定义combine要求继承reducer类,特点:
以map的输出key/value键值对作为输入输出键值对,作用是减少网络输出,在map节点上就合并一部分数据。
比较适合,map的输出是数值型的,方便进行统计。
压缩设置:
在提交job的时候分别设置启动压缩和指定压缩方式。
六、reducer 调优
reducer调优主要是通过参数调优和设置reducer的个数来完成。
reducer个数调优:
要求:一个reducer和多个reducer的执行结果一致,不能因为多个reducer导致执行结果异常。
规则:一般要求在hadoop集群中的执行mr程序,map执行完成100%后,尽量早的看到reducer执行到33%,可以通过命令hadoop job -status job_id或者web页面来查看。
原因: map的执行process数是通过inputformat返回recordread来定义的;而reducer是有三部分构成的,分别为读取mapper输出数据、合并所有输出数据以及reduce处理,其中第一步要依赖map的执行,所以在数据量比较大的情况下,一个reducer无法满足性能要求的情况下,我们可以通过调高reducer的个数来解决该问题。
优点:充分利用集群的优势。
缺点:有些mr程序没法利用多reducer的优点,比如获取top n的mr程序。
七、runner 调优
runner调优其实就是在提交job的时候设置job参数,一般都可以通过代码和xml文件两种方式进行设置。
1~8详见ActiveUserRunner(before和configure方法),9详解TransformerBaseRunner(initScans方法)
1. mapred.child.java.opts: 修改childyard进程执行的jvm参数,针对map和reducer均有效,默认:-Xmx200m
2. mapreduce.map.java.opts: 需改map阶段的childyard进程执行jvm参数,默认为空,当为空的时候,使用mapred.child.java.opts。
3. mapreduce.reduce.java.opts:修改reducer阶段的childyard进程执行jvm参数,默认为空,当为空的时候,使用mapred.child.java.opts。
4. mapreduce.job.reduces: 修改reducer的个数,默认为1。可以通过job.setNumReduceTasks方法来进行更改。
5. mapreduce.map.speculative:是否启动map阶段的推测执行,默认为true。其实一般情况设置为false比较好。可通过方法job.setMapSpeculativeExecution来设置。
6. mapreduce.reduce.speculative:是否需要启动reduce阶段的推测执行,默认为true,其实一般情况设置为fase比较好。可通过方法job.setReduceSpeculativeExecution来设置。
7. mapreduce.map.output.compress:设置是否启动map输出的压缩机制,默认为false。在需要减少网络传输的时候,可以设置为true。
8. mapreduce.map.output.compress.codec:设置map输出压缩机制,默认为org.apache.hadoop.io.compress.DefaultCodec,推荐使用SnappyCodec(在之前版本中需要进行安装操作,现在版本不太清楚,安装参数:http://www.cnblogs.com/chengxin1982/p/3862309.html)
9. hbase参数设置
由于hbase默认是一条一条数据拿取的,在mapper节点上执行的时候是每处理一条数据后就从hbase中获取下一条数据,通过设置cache值可以一次获取多条数据,减少网络数据传输。
架构设计与项目总结
项目流程
模块细节
模块的计算方式
问题
如何解决的
总结:
写离线项目简历