DAO层的基础建设:
ProductAreaTop3DaoImpl 用于插入或者更新区域热门商品的数据
package com.aura.bigdata.analysis.dao.impl.product;
import com.aura.bigdata.analysis.dao.product.IProductAreaTop3Dao;
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3;
import com.aura.bigdata.analysis.util.DBCPUtils;
import org.apache.commons.dbutils.QueryRunner;
import java.sql.SQLException;
import java.util.List;
public class ProductAreaTop3DaoImpl implements IProductAreaTop3Dao {
private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());
String sql = "INSERT INTO product_area_top3 VALUES(?, ?, ?, ?, ?, ?, ?)";
@Override
public void insert(ProductAreaTop3 entity) {
try {
qr.update(sql, entity.getTaskId(), entity.getArea(),
entity.getAreaLevel(), entity.getProductId(),
entity.getProductName(), entity.getClickCount(),
entity.getProductStatus());
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void insertBatch(List<ProductAreaTop3> list) {
Object[][] params = new Object[list.size()][];
for (int i = 0; i < list.size(); i++) {
ProductAreaTop3 entity = list.get(i);
Object[] obj = {
entity.getTaskId(), entity.getArea(),
entity.getAreaLevel(), entity.getProductId(),
entity.getProductName(), entity.getClickCount(),
entity.getProductStatus()
};
params[i] = obj;
}
try {
qr.batch(sql, params);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
与之匹配的实现接口:
package com.aura.bigdata.analysis.dao.product;
import com.aura.bigdata.analysis.dao.IBaseDao;
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3;
public interface IProductAreaTop3Dao extends IBaseDao<ProductAreaTop3> {
}
结果储存到如下表中:
CREATE TABLE IF NOT EXISTS `product_area_top3` (
task_id int,
area varchar(20),
area_level varchar(20),
product_id bigint,
product_name varchar(50),
click_count bigint,
product_status varchar(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
开始编辑spark作业
首先要加载区域信息
package com.aura.bigdata.analysis.jobs.product
import java.util
import java.util.{Date, Properties}
import com.alibaba.fastjson.{JSON, JSONObject}
import com.aura.bigdata.analysis.accumulators.SessionAggrInfoAccumulator
import com.aura.bigdata.analysis.conf.ConfigurationManager
import com.aura.bigdata.analysis.constants.Constants
import com.aura.bigdata.analysis.dao.impl.TaskDaoImpl
import com.aura.bigdata.analysis.dao.impl.product.ProductAreaTop3DaoImpl
import com.aura.bigdata.analysis.dao.impl.session.SessionAggrStatDaoImpl
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3
import com.aura.bigdata.analysis.domain.session.SessionAggrStat
import com.aura.bigdata.analysis.jobs.session.{CategorySort, UserSessionAggrStatAnalysisApp}
import com.aura.bigdata.analysis.jobs.session.UserSessionAggrStatAnalysisApp.{aggregateInfos, getRangeSession}
import com.aura.bigdata.analysis.mock.MockData
import com.aura.bigdata.analysis.udf.UDFUtils
import com.aura.bigdata.analysis.util._
import com.aura.bigdata.analysis.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
/**
* 需求:根据用户指定的日期范围,统计各个区域下的最热门的top3商品
* Spark作业接收taskid,查询对应的MySQL中的task,获取用户指定的筛选参数;统计出指定日期范围内的,各个区域的top3热门商品;最后将结果写入MySQL表中。
*/
object ProductAreaTopJob {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
//1、spark 信息
val conf = new SparkConf()
.setMaster("local")
.setAppName(s"${UserSessionAggrStatAnalysisApp.getClass.getSimpleName}")
SparkUtils.setMaster(conf, ConfigurationManager.dMode)
//设置其他相关的参数 比如kryo
val sc = new SparkContext(conf)
val sqlContext = SparkUtils.getSQLContext(sc, ConfigurationManager.dMode, isHive = true)
//注册
sqlContext.udf.register[String, String]("getJsonInfo", str => UDFUtils.getJsonInfo(str))
//2、task id读取 task作业
val taskId = ConfigurationManager.getIntProperty(Constants.SPARK_JOB_SESSION_TASK_ID)
val taskDao = new TaskDaoImpl
val task = taskDao.getTaskById(taskId)
if(task == null) {
System.err.println(s"taskId:${taskId}找不到,请确认参数是否正确")
System.exit(-1)
}
val params = JSON.parseObject(task.getTask_param)
// println("params: " + params)
//3、加载数据
MockData.mock(sc, sqlContext)
//4、按条件筛选session
getRangeSession(params, sqlContext)
/**
* 5、加载区域信息---->mysql中的cities表
*/
loadAreaInfo(sqlContext)
/**
* 6、关联相关数据
* city_info、product_click_city、product_info
*/
joinProductInfo(sqlContext)
/**
* 7、聚合统计,计算每一个区域下面的商品点击数量
*/
calcAreaProduct(sqlContext)
/**
* 8、最终结果分组topN获取
*/
calcAreaProductTop3(sqlContext, taskId)
/**
* 9、入库
*/
export2MySQL(taskId, sqlContext)
}
def export2MySQL(taskId:Int, sqlContext:SQLContext): Unit = {
val sql = "SELECT " +
"taskId,"+
"area, " +
"area_level, " +
"product_id, " +
"product_name, " +
"product_status, " +
"count, " +
"ROW_NUMBER() OVER(PARTITION BY area ORDER BY count DESC) as rank " +
"FROM area_product_click_count " +
"HAVING rank < 4"
val df = sqlContext.sql(sql)
df.rdd.foreachPartition(partition => {
if(!partition.isEmpty) {
val patDao = new ProductAreaTop3DaoImpl
val list = new util.ArrayList[ProductAreaTop3]()
for (row <- partition) {
val pat = new ProductAreaTop3()
pat.setTaskId(taskId)
pat.setArea(row.getAs[String]("area"))
pat.setAreaLevel(row.getAs[String]("area_level"))
pat.setClickCount(row.getAs[Long]("count"))
pat.setProductId(row.getAs[Long]("product_id"))
pat.setProductName(row.getAs[String]("product_name"))
pat.setProductStatus(row.getAs[String]("product_status"))
list.add(pat)
}
patDao.insertBatch(list)
}
})
}
/**
* 8、最终结果分组topN获取
* 开窗函数row_number()
*/
def calcAreaProductTop3(sqlContext:SQLContext, taskId:Int){
val sql = "SELECT " +
"area, " +
"area_level, " +
"product_id, " +
"product_name, " +
"product_status, " +
"count, " +
"ROW_NUMBER() OVER(PARTITION BY area ORDER BY count DESC) as rank " +
"FROM area_product_click_count " +
"HAVING rank < 4"
val df = sqlContext.sql(sql)
df.registerTempTable("area_product_top3_temp")
df.show()
}
def calcAreaProduct(sqlContext:SQLContext): Unit = {
val sql = "SELECT " +
"area, " +
"area_level, " +
"product_id, " +
"product_name, " +
"product_status, " +
"count(product_id) as count " +
"FROM area_product_click_tmp " +
"GROUP BY area, product_id, area_level, product_name, product_status"
val df = sqlContext.sql(sql)
// df.show()
df.registerTempTable("area_product_click_count")
}
/**
* 将三张表的内容关联在一起
* city_info、product_click_city、product_info
* 结果
* task_id、area、area_level、product_id、city_names、click_count、product_name、product_status
* 比如区域级别分为了这个几个级别
* AAAAA ---> 华东|华北|华南
* AAAA ---> 华中
* AAA ---> 西南
* AA ---> 东北
* A ---> 西北
* @param sqlContext
*/
def joinProductInfo(sqlContext:SQLContext): Unit = {
val sql = "SELECT " +
"ci.area, " +
"CASE " +
"WHEN ci.area = '华东' OR ci.area = '华北' OR ci.area = '华南' THEN 'AAAAA' " +
"WHEN ci.area = '华中' THEN 'AAAA' " +
"WHEN ci.area = '西南' THEN 'AAA' " +
"WHEN ci.area = '东北' THEN 'AA' " +
"ELSE 'A' " +
"END area_level, " +
"pi.product_id, " +
"pi.product_name, " +
"if(getJsonInfo(pi.extend_info) = '0', '自营', '第三方') product_status " +
"FROM product_click_city pcc " +
"LEFT JOIN city_info ci on pcc.city_id = ci.city_id " +
"LEFT JOIN product_info pi on pcc.click_product_id = pi.product_id "
val df = sqlContext.sql(sql)
df.registerTempTable("area_product_click_tmp")
df.show()
}
def loadAreaInfo(sqlContext:SQLContext): Unit = {
val url = DBCPUtils.url
val username = DBCPUtils.username
val password = DBCPUtils.password
val properties = new Properties()
properties.put("user", username)
properties.put("password", password)
val df = sqlContext.read.jdbc(url, "cities", properties)
df.show()
df.registerTempTable("city_info") //将表注册进数据库中
}
/**
* 基于运营产品等提交的参数,过滤出session信息
* @param params {"startAge":[20], "endAge":[50], "startDate":["2018-08-13"], "endDate":["2018-08-13"]}
* @param sqlContext
*
*/
def getRangeSession(params:JSONObject, sqlContext: SQLContext) = {
val startDate = ParamUtils.getParam(params, Constants.PARAM_START_DATE)
val endDate = ParamUtils.getParam(params, Constants.PARAM_END_DATE)
val sql = "SELECT " +
"click_product_id, " +
"city_id " +
"FROM user_visit_action " +
"WHERE `date` >= '" + startDate + "' " +
"AND `date` <= '" + endDate + "' " +
"AND click_product_id is not null";
val df = sqlContext.sql(sql)
df.registerTempTable("product_click_city")
df.show()
}
}
在这个spark作业中,由于spark不支持row窗口函数,sparksql无法运行,则需要切换成hivesql来运行
需要编写以下sparkutil类
package com.aura.bigdata.analysis.utils
import com.aura.bigdata.analysis.constants.DeployMode
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object SparkUtils {
def setMaster(conf:SparkConf, dMode:DeployMode) {
if(dMode.equals(DeployMode.DEV)) {
conf.setMaster("local[4]")
}
}
def getSQLContext(sc: SparkContext, dMode: DeployMode, isHive:Boolean):SQLContext = {
if(dMode.equals(DeployMode.DEV)) {
if(isHive) {
new HiveContext(sc)
} else {
new SQLContext(sc)
}
} else {
new HiveContext(sc)
}
}
}