项目简介:
本项目主要用于互联网电商企业中,使用Spark技术开发的大数据统计分析平台,对电商网站的各种用户行为(访问行 为、购物行为、广告点击行为等)进行复杂的分析。用统计分析出来的数据,辅助公司中的PM(产品经理)、数据分析师以及管理人员分析现有产品的情况,并根据用户行为分析结果持续改进产品的设计,以及调整公司的战略和业务。最终达到用大数据技术来帮助提升公司的业绩、营业额以及市场占有率的目标。
项目主要采用目前大数据领域较为流行的Spark技术堆栈。采用了Spark技术生态栈中最常用的三个技术框架,Spark Core、Spark SQL和Spark Streaming,进行离线计算和实时计算业务模块的开发。实现了包括用户访问session分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计4个业务模块。
主要的功能和模块:
用户访问session分析:该模块主要是对用户访问session进行统计分析,包括session的聚合指标计算、按时间比例随机抽取session、获取每天点击、下单和购买排名前N的品类、并获取topN品类的点击量排名前N的session。该模块可以让产品经理、数据分析师以及企业管理层形象地看到各种条件下的具体用户行为以及统计指标,从而对公司的产品设计以及业务发展战略做出调整。主要使用Spark Core实现。
页面单跳转化率统计:该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。该模块可以让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率,从而对网页布局,进行更好的优化设计。主要使用Spark Core实现。
热门商品离线统计:该模块主要实现每天统计出各个区域的top3热门商品。然后使用Oozie进行离线统计任务的定时调度;使用Zeppelin进行数据可视化的报表展示。该模块可以让企业管理层看到公司售卖的商品的整体情况,从而对公司的商品相关的战略进行调整。主要使用Spark SQL实现。
广告流量实时统计:该模块负责实时统计公司的广告流量,包括广告展现流量和广告点击流量。实现动态黑名单机制,以及黑名单过滤;实现滑动窗口内的各城市的广告展现流量和广告点击流量的统计;实现每个区域每个广告的点击流量实时统计;实现每个区域top3点击量的广告的统计。主要使用Spark Streaming实现。
架构图如下:
项目用到的算法
该项目用到的是:按照时间比例抽取session算法按照时间比例抽取session算法
正是因为各个时间段内的用户访问量的不同,那我们使用对应的数据做一个聚合统计分析,则显的不是很准确,举个例子,我们不能一个处于业务高峰期的数据流量去衡量处于业务低谷期的业务需求。
又因为我们要从全量session中抽取一部分的session,比如1000条的session,那么我不就不应该从业务量峰值中获取,也不能冲业务低谷值中获取,要想用抽取的这一部分数据去准去的衡量全量的数据,那么我们就应该每一个阶段,每一个部分都应该抽取相应比例的数据。
样本个数/容量:就是要抽取的事物的个数
样本:要抽取的事物组成的集体
总体:所有事物的整体
要从学校2000人中,抽取200个人,参加活动,要求每个方向都要有对人的学生,怎么才能做到想对公平的人员分配呢?
最简单的就是按照每个学科方向学员个数占总体的比重来进行抽取。
x就是应该抽取的人数(session数)
Session块的DAO层(数据访问层)
ISessionAggrStatDao接口:
import com.aura.bigdata.analysis.domain.session.SessionAggrStat;
import java.util.List;
public interface ISessionAggrStatDao {
void insert(SessionAggrStat sas);
void insertBatch(List<SessionAggrStat> list);
}
用来聚合统计用户在某一个页面上的停留时间,利用QueryRunner来实现查询
其中insert用于单条插入/更新,insertBatch用于List集合对象的插入更新。
public class SessionAggrStatDaoImpl implements ISessionAggrStatDao {
private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());
private String sql = "INSERT INTO session_aggr_stat VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; //向表 session_aggr_stat 中插入数据
@Override
public void insert(SessionAggrStat sas) {
try { //同时抛出异常
qr.update(sql, sas.getTask_id(), sas.getSession_count(),
sas.getVisit_len_1s_3s_ratio(), sas.getVisit_len_4s_6s_ratio(),
sas.getVisit_len_7s_9s_ratio(), sas.getVisit_len_10s_30s_ratio(),
sas.getVisit_len_30s_60s_ratio(), sas.getVisit_len_1m_3m_ratio(),
sas.getVisit_len_3m_10m_ratio(), sas.getVisit_len_10m_30m_ratio(),
sas.getVisit_len_30m_ratio(), sas.getStep_len_1_3_ratio(),
sas.getStep_len_4_6_ratio(), sas.getStep_len_7_9_ratio(),
sas.getStep_len_10_30_ratio(), sas.getStep_len_30_60_ratio(),
sas.getStep_len_60_ratio());
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void insertBatch(List<SessionAggrStat> list) { //对象的个数保存在list里面,然后对list的对象进行操作
Object[][] params = new Object[list.size()][];
for (int i = 0; i < list.size(); i++) {
SessionAggrStat sas = list.get(i);
Object[] obj = {
sas.getTask_id(),sas.getSession_count(),
sas.getVisit_len_1s_3s_ratio(), sas.getVisit_len_4s_6s_ratio(),
sas.getVisit_len_7s_9s_ratio(), sas.getVisit_len_10s_30s_ratio(),
sas.getVisit_len_30s_60s_ratio(), sas.getVisit_len_1m_3m_ratio(),
sas.getVisit_len_3m_10m_ratio(), sas.getVisit_len_10m_30m_ratio(),
sas.getVisit_len_30m_ratio(), sas.getStep_len_1_3_ratio(),
sas.getStep_len_4_6_ratio(), sas.getStep_len_7_9_ratio(),
sas.getStep_len_10_30_ratio(), sas.getStep_len_30_60_ratio(),
sas.getStep_len_60_ratio()
};
params[i] = obj;
}
try {
qr.batch(sql, params);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
其结果储存到表session_aggr_stat当中
CREATE TABLE IF NOT EXISTS `session_aggr_stat` (
`task_id` int(11) NOT NULL,
`session_count` int(11) DEFAULT NULL,
`visit_len_1s_3s_ratio` double DEFAULT NULL,
`visit_len_4s_6s_ratio` double DEFAULT NULL,
`visit_len_7s_9s_ratio` double DEFAULT NULL,
`visit_len_10s_30s_ratio` double DEFAULT NULL,
`visit_len_30s_60s_ratio` double DEFAULT NULL,
`visit_len_1m_3m_ratio` double DEFAULT NULL,
`visit_len_3m_10m_ratio` double DEFAULT NULL,
`visit_len_10m_30m_ratio` double DEFAULT NULL,
`visit_len_30m_ratio` double DEFAULT NULL,
`step_len_1_3_ratio` double DEFAULT NULL,
`step_len_4_6_ratio` double DEFAULT NULL,
`step_len_7_9_ratio` double DEFAULT NULL,
`step_len_10_30_ratio` double DEFAULT NULL,
`step_len_30_60_ratio` double DEFAULT NULL,
`step_len_60_ratio` double DEFAULT NULL,
PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
类似的,还有
SessionDetailDaoImpl,用于更新和插入抽取session中的各项明细。
import com.aura.bigdata.analysis.dao.session.ISessionDetailDao;
import com.aura.bigdata.analysis.domain.session.SessionDetail;
import com.aura.bigdata.analysis.util.DBCPUtils;
import org.apache.commons.dbutils.QueryRunner;
import java.sql.SQLException;
import java.util.List;
public class SessionDetailDaoImpl implements ISessionDetailDao {
private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());
String sql = "INSERT INTO session_detail VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
@Override
public void insert(SessionDetail entity) {
try {
qr.update(sql, entity.getTaskId(), entity.getUserId(), entity.getSessionId(),
entity.getPageId(), entity.getActionTime(), entity.getSearchKeyword(),
entity.getClickCategoryId(), entity.getClickProductId(), entity.getOrderCategoryIds(),
entity.getOrderProductIds(), entity.getPayCategoryIds(), entity.getPayProductIds());
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void insertBatch(List<SessionDetail> list) {
Object[][] params = new Object[list.size()][];
for (int i = 0; i < list.size(); i++) {
SessionDetail entity = list.get(i);
Object[] obj = {
entity.getTaskId(), entity.getUserId(), entity.getSessionId(),
entity.getPageId(), entity.getActionTime(), entity.getSearchKeyword(),
entity.getClickCategoryId(), entity.getClickProductId(), entity.getOrderCategoryIds(),
entity.getOrderProductIds(), entity.getPayCategoryIds(), entity.getPayProductIds()
};
params[i] = obj;
}
try {
qr.batch(sql, params);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
结果储存表session_detail中
CREATE TABLE `session_detail` (
`task_id` int(11) NOT NULL,
`user_id` int(11) DEFAULT NULL,
`session_id` varchar(255) DEFAULT NULL,
`page_id` int(11) DEFAULT NULL,
`action_time` varchar(255) DEFAULT NULL,
`search_keyword` varchar(255) DEFAULT NULL,
`click_category_id` int(11) DEFAULT NULL,
`click_product_id` int(11) DEFAULT NULL,
`order_category_ids` varchar(255) DEFAULT NULL,
`order_product_ids` varchar(255) DEFAULT NULL,
`pay_category_ids` varchar(255) DEFAULT NULL,
`pay_product_ids` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SessionRandomExtractDaoImpl用于抽取我们按照时间比例随机获得的session
public class SessionRandomExtractDaoImpl implements ISessionRandomExtractDao {
private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());
String sql = "INSERT INTO session_random_extract VALUES(?, ?, ?, ?, ?)";
@Override
public void insert(SessionRandomExtract entity) {
try {
qr.update(sql, entity.getTaskId(), entity.getSessionId(),
entity.getStartTime(), entity.getSearchKeywords(),
entity.getClickCategoryIds());
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void insertBatch(List<SessionRandomExtract> list) {
Object[][] params = new Object[list.size()][];
for (int i = 0; i < list.size(); i++) {
SessionRandomExtract entity = list.get(i);
Object[] obj = {
entity.getTaskId(), entity.getSessionId(),
entity.getStartTime(), entity.getSearchKeywords(),
entity.getClickCategoryIds()
};
params[i] = obj;
}
try {
qr.batch(sql, params);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
session_random_extract表,存储我们的按时间比例随机抽取功能抽取出来的1000个session
CREATE TABLE `session_random_extract` (
`task_id` int(11) NOT NULL,
`session_id` varchar(255) DEFAULT NULL,
`start_time` varchar(50) DEFAULT NULL,
`search_keywords` varchar(255) DEFAULT NULL,
`click_category_ids` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
接下来的是scala编写的spark作业部分:
第一个业务模块,用户session聚合统计分析,主要基于SparkCore来完成
-
产品、运营、数据分析师、管理层人员,在页面根据需要,添加相关的查询参数,来启动对应的Spark作业,
- 以便获取对应的用户数据。
-
参数:
-
搜索关键字:比如用户搜索SPA
-
品类:
-
年龄
-
地域
-
性别
-
职业
-
.......
- 按条件筛选session
统计出符合条件的session中,访问时长在1s ~ 3s、4s ~ 6s、7s ~ 9s、10s ~ 30s、30s ~ 60s、1m ~ 3m、3m ~ 10m、10m ~ 30m、30m以上各个范围内的session占比;访问步长在1 ~ 3、4 ~ 6、7 ~ 9、10 ~ 30、30 ~ 60、60以上各个范围内的session占比
在符合条件的session中,按照时间比例随机抽取1000个session
在符合条件的session中,获取点击、下单和支付数量排名前10的品类
对于排名前10的品类,分别获取其点击次数排名前10的session
参数及数据加载部分:
object UserSessionAggrStatAnalysisApp {
def main(args: Array[String]): Unit = {
//在生成和测试中通过外置调用类传递task id
//判断传入的参数是否合法并提供报错
if(args == null || args.length < 1) {
System.err.println(
"""Parameter Errors:Usage: <taskId>
| taskId: task's id
""".stripMargin)
System.exit(-1)
// }
Logger.getLogger("org.apache.spark").setLevel(Level.WARN) //获得运行过程中生成的日志,并设置日志的级别
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
//1、spark 信息
val conf = new SparkConf()
.setMaster("local")
.setAppName(s"${UserSessionAggrStatAnalysisApp.getClass.getSimpleName}")
SparkUtils.setMaster(conf, ConfigurationManager.dMode)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[CategorySort]))
//设置其他相关的参数 比如kryo
val sc = new SparkContext(conf)
val sqlContext = SparkUtils.getSQLContext(sc, ConfigurationManager.dMode, isHive = false)
//2、task id读取 task作业
val taskId = ConfigurationManager.getIntProperty(Constants.SPARK_JOB_SESSION_TASK_ID)
val taskDao:ITaskDao = new TaskDaoImpl // 父类引用指向子类
val task = taskDao.getTaskById(taskId)
if(task == null) {
System.err.println(s"taskId:${taskId}找不到,请确认参数是否正确") //如果找不到taskID则报错
System.exit(-1)
}
val params = JSON.parseObject(task.getTask_param) //利用jason封装task对象
//3、加载数据
MockData.mock(sc, sqlContext) //这里是模拟数据
//4、按条件筛选session,并以rdd形式储存
val actionsRDD:RDD[Row] = getRangeSession(params, sqlContext)
/*
5、因为首先要获取满足条件的session信息(时间、用户、网页信息),获取session聚合信息
也就说要获取每一个session聚合信息,比如一个session操作多少次网页,访问时长
groupByKey
sessionid -> 多次操作
*/
val sessionId2ActionRDD:RDD[(String, Row)] = actionsRDD.map{case (row) => { //利用CASE的模式匹配来
val sessionId = row.getAs[String]("session_id")//获取列名session_id对应的数据
/*new Tuple2[(String, Row)]*/(sessionId, row)
}}
val sAggrAccu = sc.accumulator("")(new SessionAggrInfoAccumulator()) //累加器
val sessionId2AggrInfoRDD:RDD[(String, String)] = aggregateInfos(sessionId2ActionRDD, sqlContext, params, sAggrAccu)
sessionId2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY) //持久化
用户session数据的聚合操作
第一步:sessionid -> 多次操作
计算的聚合结果,也就是说是一个sessionId对应一条聚合信息,那么是一个one-to-one的操作,请使用map/mapPartitions(需要注意数据量的问题,OOM)
要统计的数据:
步长(DV深度):就是在一个session中有多少次action操作
时长:就是在一个session中的最大的时间-最小的时间
搜索关键字:一个用户统计相同关键字只需要一个即可,多个关键字之间使用",",下同
下单:
支付
点击
聚合之后的结果使用:聚合字段key1=聚合字段value1|聚合字段key2=聚合字段value2...
比如:steLen=10|visitLen=15|...
第二步:session数据和用户数据的关联聚合 user_visit_action a join user_info i on a.user_id = i.user_id
尽量少用groupByKey算子,没有本地预聚合,使用aggregateByKey和combineByKey或者reduceByKey代替
def aggregateInfos(sessionId2ActionRDD:RDD[(String, Row)], sqlContext:SQLContext, params:JSONObject,
sAggrAccu:Accumulator[String]):RDD[(String, String)] = {
val sessionId2ActionsRDD:RDD[(String, Iterable[Row])] = sessionId2ActionRDD.groupByKey()
val userId2PartAggrInfoRDD:RDD[(Long, String)] = sessionId2ActionsRDD.map{case (sessionId, rows) => {
var partAggrInfo = ""
var startTime:Date = null
var endTime:Date = null
var userId:Long = -1
var searchKeywords:String = ""
var clickCategoryIds:String = ""
var orderCategoryIds:String = ""
var payCategoryIds:String = ""
for(row <- rows) {
if(userId == -1) {
userId = row.getAs[Long]("user_id")
}
//时间
val actionTime = DateUtils.parseTime(row.getAs[String]("action_time"))
if(startTime == null) {
startTime = actionTime
}
if(endTime == null) {
endTime = actionTime
}
if(startTime.after(actionTime)) {//actionTime < startTime
startTime = actionTime
}
if(endTime.before(actionTime)) {
endTime = actionTime
}
//searchKeywords
val searchWord = row.getAs[String]("search_keyword")
if(searchWord != null) {
if(!searchKeywords.split(",").contains(searchWord)) {
searchKeywords += searchWord + "," //没有就要补全
}
}
//clickCategoryIds
val clickCategoryId = row.getAs[Long]("click_category_id")
if(clickCategoryId != null) {
if(!clickCategoryIds.split(",").contains(clickCategoryId + "")) {
clickCategoryIds += clickCategoryId + ","
}
}
//orderCategoryIds
val ocids = row.getAs[String]("order_category_ids")
if(ocids != null) {
for(ocid <- ocids.split(",")) {
if(!orderCategoryIds.split(",").contains(ocid + "")) {
orderCategoryIds += ocid + ","
}
}
}
//payCategoryIds
val pcids = row.getAs[String]("pay_category_ids")
if(pcids != null) {
for(pcid <- pcids.split(",")) {
if(!payCategoryIds.split(",").contains(pcid + "")) {
payCategoryIds += pcid + ","
}
}
}
}
//步长
val stepLen = rows.size
//时长
val visitLen = (endTime.getTime - startTime.getTime) / 1000
searchKeywords = StringUtils.trimComma(searchKeywords)
clickCategoryIds = StringUtils.trimComma(clickCategoryIds)
orderCategoryIds = StringUtils.trimComma(orderCategoryIds)
payCategoryIds = StringUtils.trimComma(payCategoryIds)
partAggrInfo += Constants.FIELD_STEP_LEN + "=" + stepLen + "|" +
Constants.FIELD_VISIT_LEN + "=" + visitLen + "|" +
Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|" +
Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" +
Constants.FIELD_ORDER_CATEGORY_IDS + "=" + orderCategoryIds + "|" +
Constants.FIELD_PAY_CATEGORY_IDS + "=" + payCategoryIds + "|" +
Constants.FIELD_SESSION_ID + "=" + sessionId + "|" +
Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)
(userId, partAggrInfo)
}}
第二步:session聚合数据和用户聚合信息进行关联
step 1、过滤满足条件的用户
用户数据在内存表user_info中
年龄段
职业
地区
性别
step 2、两张表进行join
//step 1
val filteredUserId2AggrInfoRDD:RDD[(Long, String)] = filterUser(sqlContext, params) //过滤用户条件
// step 2
val userId2AggrInfoRDD:RDD[(Long, (String, String))] = userId2PartAggrInfoRDD.join(filteredUserId2AggrInfoRDD)
val sessionId2AggrInfoRDD:RDD[(String, String)] = userId2AggrInfoRDD.map{case (userId, (sessionAggrInfo, userAggrInfo)) => {
val sessionId = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
val fullAggrInfo = sessionAggrInfo + "|" + userAggrInfo
//这里正是满足条件的数据
val stepLen = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_STEP_LEN).toInt
val visitLen = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_VISIT_LEN).toInt
sAggrAccu.add(Constants.FIELD_SESSION_COUNT)//总的session个数+1
calcStepLen(sAggrAccu, stepLen)
calcVisitLen(sAggrAccu, visitLen)
(sessionId, fullAggrInfo)
}}
println("sessionId2AggrInfoRDD: " + sessionId2AggrInfoRDD.count())
// println(sessionId2AggrInfoRDD.take(1))//只读取了其中的一条记录,进而操作获取累加器之有的问题
// println("acculumator: " + sAggrAccu.value)
sessionId2AggrInfoRDD
}
利用filter过滤器来过滤不合法以及不需要的数据
def filterUser(sqlContext:SQLContext, params:JSONObject):RDD[(Long, String)] = {
val userRDD:RDD[Row] = sqlContext.sql("SELECT * FROM user_info").rdd
val startAge = ParamUtils.getParam(params, Constants.PARAM_START_AGE)
val endAge = ParamUtils.getParam(params, Constants.PARAM_END_AGE)
val professions = ParamUtils.getParam(params, Constants.PARAM_PROFESSIONALS)
val cities = ParamUtils.getParam(params, Constants.PARAM_CITIES)
val gender = ParamUtils.getParam(params, Constants.PARAM_SEX)
//用户自身过滤之后的聚合数据
userRDD.filter(row => {
val age = row.getAs[Int]("age")
val professional = row.getAs[String]("professional")
val sex = row.getAs[String]("sex")
val city = row.getString(5)
var flag = true
if(startAge != null && endAge != null) {//有漏洞的代码
if (age < startAge.toInt || age > endAge.toInt) {
flag = false
}
}
//professions
if(professions != null) {
if(!professions.split(",").contains(professional)) {
flag = false
}
}
if(cities != null) {
if(!cities.split(",").contains(city)) {
flag = false
}
}
if(gender != null) {
if(!gender.split(",").contains(sex)) {
flag = false
}
}
flag
}).map(row => {
val age = row.getAs[Int]("age")
val professional = row.getAs[String]("professional")
val sex = row.getAs[String]("sex")
val city = row.getString(5)
val partInfo = Constants.FIELD_AGE + "=" + age + "|" +
Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +
Constants.FIELD_SEX + "=" + sex + "|" +
Constants.FIELD_CITY + "=" + city
(row.getAs[Long]("user_id"), partInfo)
})
}
编写累加器用于统计不同时长和步长session的个数:
import com.aura.bigdata.analysis.constants.Constants
import com.aura.bigdata.analysis.util.StringUtils
import org.apache.spark.AccumulatorParam
/**
* 自定义累加器:
* 统计出符合条件的session中,访问时长在1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m以上各个范围内的session占比;
* 访问步长在1~3、4~6、7~9、10~30、30~60、60以上各个范围内的session占比
* 1s~3s=10
* 4s~6s=9
* 1~3=15
*
* 1s~3s=10|4s~6s=9|1~3=15 , 这个就是要拼接起来
*/
class SessionAggrInfoAccumulator extends AccumulatorParam[String] {
/**
* 累加器的初始化值
* @param initialValue
*/
override def zero(initialValue: String) = {
Constants.FIELD_SESSION_COUNT + "=0|" +
Constants.TIME_PERIOD_1s_3s + "=0|" +
Constants.TIME_PERIOD_4s_6s + "=0|" +
Constants.TIME_PERIOD_7s_9s + "=0|" +
Constants.TIME_PERIOD_10s_30s + "=0|" +
Constants.TIME_PERIOD_30s_60s + "=0|" +
Constants.TIME_PERIOD_1m_3m + "=0|" +
Constants.TIME_PERIOD_3m_10m + "=0|" +
Constants.TIME_PERIOD_10m_30m + "=0|" +
Constants.TIME_PERIOD_30m + "=0|" +
Constants.STEP_PERIOD_1_3 + "=0|" +
Constants.STEP_PERIOD_4_6 + "=0|" +
Constants.STEP_PERIOD_7_9 + "=0|" +
Constants.STEP_PERIOD_10_30 + "=0|" +
Constants.STEP_PERIOD_30_60 + "=0|" +
Constants.STEP_PERIOD_60 + "=0|"
}
/**
* 调用累加器的add方法,最终就是调用该addInPlace方法
* @param old 累加器中已经有的值
* @param newFiled 需要累加的字段
*/
override def addInPlace(old: String, newFiled: String) = {
if(old == null || old.equals("")) {
newFiled
} else {
if(newFiled == null) {
old
} else {
val oldValue = StringUtils.getFieldFromConcatString(old, "\\|", newFiled)
var newValue = 0
if(oldValue == null) {
newValue = 1
} else {
newValue = oldValue.toInt + 1
}
StringUtils.setFieldInConcatString(old, "\\|", newFiled, newValue + "")
}
}
}
}
通过之前的累加器计算不同时长和步长在session中的占比
/*
访问步长在1~3、4~6、7~9、10~30、30~60、60以上各个范围内的session占比
*/
def calcStepLen(sAggrAccu:Accumulator[String], stepLen:Int): Unit = {
if(stepLen >= 1 && stepLen <= 3) {
sAggrAccu.add(Constants.STEP_PERIOD_1_3)
} else if(stepLen >= 4 && stepLen <= 6) {
sAggrAccu.add(Constants.STEP_PERIOD_4_6)
} else if(stepLen >= 7 && stepLen <= 9) {
sAggrAccu.add(Constants.STEP_PERIOD_7_9)
} else if(stepLen >= 10 && stepLen <= 30) {
sAggrAccu.add(Constants.STEP_PERIOD_10_30)
} else if(stepLen > 30 && stepLen <= 60) {
sAggrAccu.add(Constants.STEP_PERIOD_30_60)
} else if(stepLen > 60) {
sAggrAccu.add(Constants.STEP_PERIOD_60)
}
}
/*
访问时长在1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m以上各个范围内的session占比;
*/
def calcVisitLen(sAggrAccu:Accumulator[String], visitLen:Int): Unit = {
if(visitLen >= 1 && visitLen <= 3) {
sAggrAccu.add(Constants.TIME_PERIOD_1s_3s)
} else if(visitLen >= 4 && visitLen <= 6) {
sAggrAccu.add(Constants.TIME_PERIOD_4s_6s)
} else if(visitLen >= 7 && visitLen <= 9) {
sAggrAccu.add(Constants.TIME_PERIOD_7s_9s)
} else if(visitLen >= 10 && visitLen <= 30) {
sAggrAccu.add(Constants.TIME_PERIOD_10s_30s)
} else if(visitLen > 30 && visitLen <= 60) {
sAggrAccu.add(Constants.TIME_PERIOD_30s_60s)
} else if(visitLen > 60 && visitLen <= 180) {
sAggrAccu.add(Constants.TIME_PERIOD_1m_3m)
} else if(visitLen > 180 && visitLen <= 600) {
sAggrAccu.add(Constants.TIME_PERIOD_3m_10m)
} else if(visitLen > 600 && visitLen <= 1800) {
sAggrAccu.add(Constants.TIME_PERIOD_10m_30m)
} else if(visitLen > 1800) {
sAggrAccu.add(Constants.TIME_PERIOD_30m)
}
}
统计聚合的结果用于前端的展示
/**
* 统计聚合结果--》存储到mysql,供前端展示
* @param sAggrAccu
*/
def calcAggrInfo(taskId:Int, sAggrAccu:Accumulator[String]): Unit = {
val aggrAccu = sAggrAccu.value
//sessionCount
val sessionCount = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.FIELD_SESSION_COUNT).toDouble
//visit
val vLen_1s_3s = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_1s_3s).toDouble
val vLen_4s_6s = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_4s_6s).toDouble
val vLen_7s_9s = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_7s_9s).toDouble
val vLen_10s_30s = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_10s_30s).toDouble
val vLen_30s_60s = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_30s_60s).toDouble
val vLen_1m_3m = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_1m_3m).toDouble
val vLen_3m_10m = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_3m_10m).toDouble
val vLen_10m_30m = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_10m_30m).toDouble
val vLen_30m = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.TIME_PERIOD_30m).toDouble
//step
val sLen_1_3 = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.STEP_PERIOD_1_3).toDouble
val sLen_4_6 = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.STEP_PERIOD_4_6).toDouble
val sLen_7_9 = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.STEP_PERIOD_7_9).toDouble
val sLen_10_30 = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.STEP_PERIOD_10_30).toDouble
val sLen_30_60 = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.STEP_PERIOD_30_60).toDouble
val sLen_60 = StringUtils.getFieldFromConcatString(aggrAccu, "\\|", Constants.STEP_PERIOD_60).toDouble
//visit rate
val vRate_1s_3s = NumberUtils.formatDouble(vLen_1s_3s / sessionCount, 2)
val vRate_4s_6s = NumberUtils.formatDouble(vLen_4s_6s / sessionCount, 2)
val vRate_7s_9s = NumberUtils.formatDouble(vLen_7s_9s / sessionCount, 2)
val vRate_10s_30s = NumberUtils.formatDouble(vLen_10s_30s / sessionCount, 2)
val vRate_30s_60s = NumberUtils.formatDouble(vLen_30s_60s / sessionCount, 2)
val vRate_1m_3m = NumberUtils.formatDouble(vLen_1m_3m / sessionCount, 2)
val vRate_3m_10m = NumberUtils.formatDouble(vLen_3m_10m / sessionCount, 2)
val vRate_10m_30m = NumberUtils.formatDouble(vLen_10m_30m / sessionCount, 2)
val vRate_30m = NumberUtils.formatDouble(vLen_30m / sessionCount, 2)
//step rate
val sRate_1_3 = NumberUtils.formatDouble(sLen_1_3 / sessionCount, 2)
val sRate_4_6 = NumberUtils.formatDouble(sLen_4_6 / sessionCount, 2)
val sRate_7_9 = NumberUtils.formatDouble(sLen_7_9 / sessionCount, 2)
val sRate_10_30 = NumberUtils.formatDouble(sLen_10_30 / sessionCount, 2)
val sRate_30_60 = NumberUtils.formatDouble(sLen_30_60 / sessionCount, 2)
val sRate_60 = NumberUtils.formatDouble(sLen_60 / sessionCount, 2)
val sasDao = new SessionAggrStatDaoImpl
val sessionAggrStat = new SessionAggrStat
sessionAggrStat.setTask_id(taskId)
sessionAggrStat.setSession_count(sessionCount.toInt)
sessionAggrStat.setVisit_len_1s_3s_ratio(vRate_1s_3s)
sessionAggrStat.setVisit_len_4s_6s_ratio(vRate_4s_6s)
sessionAggrStat.setVisit_len_7s_9s_ratio(vRate_7s_9s)
sessionAggrStat.setVisit_len_10s_30s_ratio(vRate_10s_30s)
sessionAggrStat.setVisit_len_30s_60s_ratio(vRate_30s_60s)
sessionAggrStat.setVisit_len_1m_3m_ratio(vRate_1m_3m)
sessionAggrStat.setVisit_len_3m_10m_ratio(vRate_3m_10m)
sessionAggrStat.setVisit_len_10m_30m_ratio(vRate_10m_30m)
sessionAggrStat.setVisit_len_30m_ratio(vRate_30m)
sessionAggrStat.setStep_len_1_3_ratio(sRate_1_3)
sessionAggrStat.setStep_len_4_6_ratio(sRate_4_6)
sessionAggrStat.setStep_len_7_9_ratio(sRate_7_9)
sessionAggrStat.setStep_len_10_30_ratio(sRate_10_30)
sessionAggrStat.setStep_len_30_60_ratio(sRate_30_60)
sessionAggrStat.setStep_len_60_ratio(sRate_60)
sasDao.insert(sessionAggrStat)
统计完基础的session之后,要开始对session进行抽取
按照时间比例抽取随机的session
思路:
总体,有700个,要抽取100个样本,按照时间(小时为单位)比例来抽取,
1、首先要知道每个小时有多少个session,以及session总量,我们就可以通过公式:每个小时要抽取的session个数=(总共要抽取的session个数 * 每个小时的session个数) / session总数
2、session数据在rdd中,而rdd是一个集合,所以我们需要从一个分布式的集合中抽取一部分数据,
需要从每一个小时的session数据中来进行抽取,所以需要将每一个小时的session数据拉取到一起(groupByKey)[hour, Iterable[session]]
3、将要抽取的个数,转化为索引列表,进行数据抽取
4、因为要用3中计算出来的索引列表,拷贝到[hour, Iterable[session]]rdd中通过map算子计算得到要抽取的数据,所以我们应该讲该索引列表通过
广播变量进行传送到集群中,而不是直接使用。
gc的种类:
minor gc:回收新生代(eden区和两个survivor区 8:1:1)
stop the world
major gc: 回收老年代和永久待(常量池)
full gc:minor + major
gc回收的频率和对象个数成正比。
* @param sessionId2AggrInfoRDD
*/
def randomExtractSession(taskId:Int, sessionId2AggrInfoRDD:RDD[(String, String)], sc:SparkContext): Unit = {
/*
1、获取每个小时的session的个数据,以小时为key,进行类似于wordcount的操作
*/
val dateHour2AggrInfo:RDD[(String, String)] = sessionId2AggrInfoRDD.map{case (sessionId, fullAggrInfo) => {
val sessionTime = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_START_TIME)
val dateHour = DateUtils.getDateHour(sessionTime)
(dateHour, fullAggrInfo)
}}
val dateHour2AggrInfos:RDD[(String, Iterable[String])] = dateHour2AggrInfo.groupByKey()
val countMap = dateHour2AggrInfo.countByKey()
countMap.foreach(kv => {
println(kv._1 + "=" + kv._2)
})
val sessionCount = countMap.values.sum
// println("sessionCount: " + sessionCount)
/*
2、计算每个小时应该要抽取的session个数
*/
val extractMap = mutable.HashMap[String, Int]()
countMap.foreach(kv => {
val dateHour = kv._1
val dateHourCount = kv._2
val extractCount = ((dateHourCount * 100) / sessionCount).toInt //计算对应的每个时间段中需要抽取的session个数
extractMap.put(dateHour, extractCount)
})
// extractMap.foreach(kv => {
// println(kv._1 + "=" + kv._2)
// })
/*
3、每个小时的要抽取的随机索引列表
加入12点有30个session,要抽取5个session
val hourSessionCount = countMap(dateHour)-->每个小时对应的session个数
val index = random.nextInt(hourSessionCount)
1
5
3
18
5
所以在抽取过程中需要进行索引存在的判断
*/
val extractIndexListMap = mutable.HashMap[String, ArrayBuffer[Int]]()
val random = new Random()
for ((dateHour, extractCount) <- extractMap) {
val ab = ArrayBuffer[Int]()
val hourSessionCount = countMap(dateHour).toInt//每个小时的session个数
for (i <- 0 until extractCount) {
var index = random.nextInt(hourSessionCount)
while(ab.contains(index)) { //如果包含相同的index则需要重新抽取
index = random.nextInt(hourSessionCount)
}
//该index在ab中不包含
ab.append(index)
}
extractIndexListMap.put(dateHour, ab)
}
//广播变量
val extractIndexListMapBC:Broadcast[mutable.HashMap[String, ArrayBuffer[Int]]] = sc.broadcast(extractIndexListMap)
/*
4、基于索引列表抽取对应的session
*/
val extractSessionInfoRDD:RDD[Iterable[String]] = dateHour2AggrInfos.map{case (dateHour, aggrInfos) => {
val indexList = extractIndexListMapBC.value(dateHour).sortWith(_ < _)//要抽取的索引列表
val extractAggrInfo = ArrayBuffer[String]()
// var index = 0
val infos = Random.shuffle(aggrInfos).toList
for(index <- indexList) {
extractAggrInfo.append(infos(index))
}
extractAggrInfo
}}
/*
将结果写到数据库
*/
extractSessionInfoRDD.foreachPartition(aggrInfosPartitions => {
if(!aggrInfosPartitions.isEmpty) {
val sreDao = new SessionRandomExtractDaoImpl
for(aggrInfos <- aggrInfosPartitions) {//遍历每一个分区中的每一个小时对应的session数据
if(!aggrInfos.isEmpty) {//每一个小时对应的session数据
val list = new util.ArrayList[SessionRandomExtract]()
for(aggrInfo <- aggrInfos) {
val sre = new SessionRandomExtract()
val sessionId = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_SESSION_ID)
val startTime = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_START_TIME)
val searchKeyWords = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)
val clickCategoryIds = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)
sre.setTaskId(taskId)
sre.setSessionId(sessionId)
sre.setStartTime(startTime)
sre.setSearchKeywords(searchKeyWords)
sre.setClickCategoryIds(clickCategoryIds)
list.add(sre)
}
sreDao.insertBatch(list)
}
}
}
})
}
利用之前抽取的session来进行求top10的操作:
* 在符合条件的session中,获取点击、下单和支付数量排名前10的品类
* 典型的二次排序
* 如果点击量相同,按照下单排序,如果下单量也相同,按照支付数量排序
* Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" +
Constants.FIELD_ORDER_CATEGORY_IDS + "=" + orderCategoryIds + "|" +
Constants.FIELD_PAY_CATEGORY_IDS + "=" + payCategoryIds + "|" +
Ordering extended java.util.Comparator with Serializable 集合具备比较器
Ordered extended java.lang.Comparable with Serializable 对象具备比较性
因为在一条session中有若干个click、order、pay所对应的categoryId,而我们最终统计的是每一个category被点击、下单、支付量的排名
所以我们需要一个categoryid为key来统计对应的数量,所以一条session中就会产生多条记录,多个Category对象
one-to-many flatMap
为了统计全量数据,需要进行wordcount
def getCategoryTop(taskId:Int, sessionId2AggrInfoRDD:RDD[(String, String)]): Unit = {
val categoryId2CountRDD:RDD[(Long, CategorySort)] = sessionId2AggrInfoRDD.flatMap{case (sessionId, fullAggrInfo) => {
val categoryMap = mutable.HashMap[Long, CategorySort]() //可变集合的转化
val clickCategoryIds = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS) //从拼接字符串中提取字段
val orderCategoryIds = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_ORDER_CATEGORY_IDS)
.
val payCategoryIds = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_PAY_CATEGORY_IDS)
if(StringUtils.isNotEmpty(clickCategoryIds)) {
for(ccid <- clickCategoryIds.split(",")) {
val option = categoryMap.get(ccid.toLong)
if(option.isDefined) {//当前option对象为some ,判断当前对象是否为空
option.get.setClickCount(1)
} else {//当前option为none
val cs = new CategorySort
cs.setId(ccid.toLong)
cs.setClickCount(1)
categoryMap.put(ccid.toLong, cs)
}
}
}
//order=8,9,99
if(StringUtils.isNotEmpty(orderCategoryIds)) {
for(ocid <- orderCategoryIds.split(",")) {
val option = categoryMap.get(ocid.toLong)
if(option.isDefined) {//当前option对象为some
option.get.setOrderCount(1)
} else {//当前option为none
val cs = new CategorySort
cs.setId(ocid.toLong)
cs.setOrderCount(1)
categoryMap.put(ocid.toLong, cs)
}
}
}
//pay=3,0,17
if(StringUtils.isNotEmpty(payCategoryIds)) {
for(pcid <- payCategoryIds.split(",")) {
val option = categoryMap.get(pcid.toLong)
if(option.isDefined) {//当前option对象为some
option.get.setPayCount(1)
} else {//当前option为none
val cs = new CategorySort
cs.setId(pcid.toLong)
cs.setPayCount(1)
categoryMap.put(pcid.toLong, cs)
}
}
}
categoryMap.toList
}}
/**
* int sum = 0
*
* for(int i = 0; i < 10; i++) {
* sum = sum + i;
* }
*
* 这里的sum就是cs1
* i就是cs2
*
*
*/
//wordcount
val categoryId2CountsRDD:RDD[(Long, CategorySort)] = categoryId2CountRDD.reduceByKey{case (cs1, cs2) => {
cs1.setClickCount(cs2.getClickCount)
cs1.setOrderCount(cs2.getOrderCount)
cs1.setPayCount(cs2.getPayCount)
cs1
}}
//排序,求top10
val categoryId2CountTop:Array[(Long, CategorySort)] = categoryId2CountsRDD.sortBy(cc => cc._2,
true,
1)(new Ordering[CategorySort](){ //在一个分区里面升序排序,也就是全局排序
override def compare(x: CategorySort, y: CategorySort) = {
var ret = y.getClickCount.compareTo(x.getClickCount)
if(ret == 0) {
ret = y.getOrderCount.compareTo(x.getOrderCount)
if(ret == 0) {
ret = y.getPayCount.compareTo(x.getPayCount)
}
}
ret
}
},ClassTag.Object.asInstanceOf[ClassTag[CategorySort]]).take(10) //通过伴生对象取值
val top10CategoryDao = new Top10CategoryDaoImpl // top10入库
val list = new util.ArrayList[Top10Category]()
for (tuple <- categoryId2CountTop) {
val tc = new Top10Category()
tc.setTaskId(taskId)
tc.setCategoryId(tuple._1)
tc.setClickCount(tuple._2.getClickCount)
tc.setOrderCount(tuple._2.getOrderCount)
tc.setPayCount(tuple._2.getPayCount)
list.add(tc)
}
top10CategoryDao.insertBatch(list)
}