meituan交互式系统浅析(2) product,统计区域热门商品

DAO层的基础建设:

ProductAreaTop3DaoImpl 用于插入或者更新区域热门商品的数据

package com.aura.bigdata.analysis.dao.impl.product;

import com.aura.bigdata.analysis.dao.product.IProductAreaTop3Dao;
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3;
import com.aura.bigdata.analysis.util.DBCPUtils;
import org.apache.commons.dbutils.QueryRunner;

import java.sql.SQLException;
import java.util.List;

public class ProductAreaTop3DaoImpl implements IProductAreaTop3Dao {
    private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());

    String sql = "INSERT INTO product_area_top3 VALUES(?, ?, ?, ?, ?, ?, ?)";

    @Override
    public void insert(ProductAreaTop3 entity) {

        try {
            qr.update(sql, entity.getTaskId(), entity.getArea(),
                           entity.getAreaLevel(), entity.getProductId(),
                           entity.getProductName(), entity.getClickCount(),
                           entity.getProductStatus());
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void insertBatch(List<ProductAreaTop3> list) {
        Object[][] params = new Object[list.size()][];
        for (int i = 0; i < list.size(); i++) {
            ProductAreaTop3 entity = list.get(i);
            Object[] obj = {
                    entity.getTaskId(), entity.getArea(),
                    entity.getAreaLevel(), entity.getProductId(),
                    entity.getProductName(), entity.getClickCount(),
                    entity.getProductStatus()
            };
            params[i] = obj;
        }
        try {
            qr.batch(sql, params);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

与之匹配的实现接口:


package com.aura.bigdata.analysis.dao.product;

import com.aura.bigdata.analysis.dao.IBaseDao;
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3;

public interface IProductAreaTop3Dao extends IBaseDao<ProductAreaTop3> {
}



结果储存到如下表中:

CREATE TABLE IF NOT EXISTS `product_area_top3` (
  task_id int,
  area varchar(20),
  area_level varchar(20),
  product_id bigint,
  product_name varchar(50),
  click_count bigint,
  product_status varchar(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

开始编辑spark作业

首先要加载区域信息
package com.aura.bigdata.analysis.jobs.product

import java.util
import java.util.{Date, Properties}

import com.alibaba.fastjson.{JSON, JSONObject}
import com.aura.bigdata.analysis.accumulators.SessionAggrInfoAccumulator
import com.aura.bigdata.analysis.conf.ConfigurationManager
import com.aura.bigdata.analysis.constants.Constants
import com.aura.bigdata.analysis.dao.impl.TaskDaoImpl
import com.aura.bigdata.analysis.dao.impl.product.ProductAreaTop3DaoImpl
import com.aura.bigdata.analysis.dao.impl.session.SessionAggrStatDaoImpl
import com.aura.bigdata.analysis.domain.product.ProductAreaTop3
import com.aura.bigdata.analysis.domain.session.SessionAggrStat
import com.aura.bigdata.analysis.jobs.session.{CategorySort, UserSessionAggrStatAnalysisApp}
import com.aura.bigdata.analysis.jobs.session.UserSessionAggrStatAnalysisApp.{aggregateInfos, getRangeSession}
import com.aura.bigdata.analysis.mock.MockData
import com.aura.bigdata.analysis.udf.UDFUtils
import com.aura.bigdata.analysis.util._
import com.aura.bigdata.analysis.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

/**
  * 需求:根据用户指定的日期范围,统计各个区域下的最热门的top3商品
  * Spark作业接收taskid,查询对应的MySQL中的task,获取用户指定的筛选参数;统计出指定日期范围内的,各个区域的top3热门商品;最后将结果写入MySQL表中。
  */
object ProductAreaTopJob {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)

        //1、spark 信息
        val conf = new SparkConf()
                          .setMaster("local")
            .setAppName(s"${UserSessionAggrStatAnalysisApp.getClass.getSimpleName}")
        SparkUtils.setMaster(conf, ConfigurationManager.dMode)

        //设置其他相关的参数 比如kryo
        val sc = new SparkContext(conf)
        val sqlContext = SparkUtils.getSQLContext(sc, ConfigurationManager.dMode, isHive = true)
        //注册
        sqlContext.udf.register[String, String]("getJsonInfo", str => UDFUtils.getJsonInfo(str))


        //2、task id读取 task作业
        val taskId = ConfigurationManager.getIntProperty(Constants.SPARK_JOB_SESSION_TASK_ID)
        val taskDao = new TaskDaoImpl
        val task = taskDao.getTaskById(taskId)
        if(task == null) {
            System.err.println(s"taskId:${taskId}找不到,请确认参数是否正确")
            System.exit(-1)
        }
        val params = JSON.parseObject(task.getTask_param)
        //        println("params: " + params)
        //3、加载数据
        MockData.mock(sc, sqlContext)
        //4、按条件筛选session
        getRangeSession(params, sqlContext)
        /**
          * 5、加载区域信息---->mysql中的cities表
          */
        loadAreaInfo(sqlContext)

        /**
          * 6、关联相关数据
          *     city_info、product_click_city、product_info
          */
        joinProductInfo(sqlContext)

        /**
          * 7、聚合统计,计算每一个区域下面的商品点击数量
          */
        calcAreaProduct(sqlContext)

        /**
          * 8、最终结果分组topN获取
          */
        calcAreaProductTop3(sqlContext, taskId)

        /**
          * 9、入库
          */
        export2MySQL(taskId, sqlContext)
    }

    def export2MySQL(taskId:Int, sqlContext:SQLContext): Unit = {
        val sql = "SELECT " +
          "taskId,"+
          "area, " +
          "area_level, " +
          "product_id, " +
          "product_name, " +
          "product_status, " +
          "count, " +
          "ROW_NUMBER() OVER(PARTITION BY area ORDER BY count DESC) as rank " +
          "FROM area_product_click_count " +
          "HAVING rank < 4"
        val df = sqlContext.sql(sql)




        df.rdd.foreachPartition(partition => {
            if(!partition.isEmpty) {
                val patDao = new ProductAreaTop3DaoImpl
                val list = new util.ArrayList[ProductAreaTop3]()
                for (row <- partition) {
                    val pat = new ProductAreaTop3()
                    pat.setTaskId(taskId)
                    pat.setArea(row.getAs[String]("area"))
                    pat.setAreaLevel(row.getAs[String]("area_level"))
                    pat.setClickCount(row.getAs[Long]("count"))
                    pat.setProductId(row.getAs[Long]("product_id"))
                    pat.setProductName(row.getAs[String]("product_name"))
                    pat.setProductStatus(row.getAs[String]("product_status"))
                    list.add(pat)
                }
                patDao.insertBatch(list)
            }
        })
    }

    /**
      * 8、最终结果分组topN获取
      * 开窗函数row_number()
      */
    def calcAreaProductTop3(sqlContext:SQLContext, taskId:Int){
        val sql = "SELECT " +
                     "area, " +
                     "area_level, " +
                     "product_id, " +
                     "product_name, " +
                     "product_status, " +
                     "count, " +
                     "ROW_NUMBER() OVER(PARTITION BY area ORDER BY count DESC) as rank " +
                  "FROM area_product_click_count " +
                  "HAVING rank < 4"
        val df = sqlContext.sql(sql)
        df.registerTempTable("area_product_top3_temp")
        df.show()




    }

    def calcAreaProduct(sqlContext:SQLContext): Unit = {
        val sql = "SELECT " +
                    "area, " +
                    "area_level, " +
                    "product_id, " +
                    "product_name, " +
                    "product_status, " +
                    "count(product_id) as count " +
                  "FROM area_product_click_tmp " +
                  "GROUP BY area, product_id, area_level, product_name, product_status"
        val df = sqlContext.sql(sql)
//        df.show()
        df.registerTempTable("area_product_click_count")
    }
    /**
      * 将三张表的内容关联在一起
      *     city_info、product_click_city、product_info
      * 结果
      *  task_id、area、area_level、product_id、city_names、click_count、product_name、product_status
      *  比如区域级别分为了这个几个级别
      *  AAAAA  ---> 华东|华北|华南
      *  AAAA   ---> 华中
      *  AAA    ---> 西南
      *  AA     ---> 东北
      *  A      ---> 西北
      * @param sqlContext
      */
    def joinProductInfo(sqlContext:SQLContext): Unit = {
        val sql = "SELECT " +
                     "ci.area, " +
                     "CASE " +
                        "WHEN ci.area = '华东' OR ci.area = '华北' OR ci.area = '华南' THEN 'AAAAA' " +
                        "WHEN ci.area = '华中' THEN 'AAAA' " +
                        "WHEN ci.area = '西南' THEN 'AAA' " +
                        "WHEN ci.area = '东北' THEN 'AA' " +
                        "ELSE 'A' " +
                     "END area_level, " +
                     "pi.product_id, " +
                     "pi.product_name, " +
                     "if(getJsonInfo(pi.extend_info) = '0', '自营', '第三方') product_status " +
                  "FROM product_click_city pcc " +
                  "LEFT JOIN city_info ci on pcc.city_id = ci.city_id " +
                  "LEFT JOIN product_info pi on pcc.click_product_id = pi.product_id "
        val df = sqlContext.sql(sql)
        df.registerTempTable("area_product_click_tmp")
        df.show()
    }

    def loadAreaInfo(sqlContext:SQLContext): Unit = {
        val url = DBCPUtils.url
        val username = DBCPUtils.username
        val password = DBCPUtils.password
        val properties = new Properties()
        properties.put("user", username)
        properties.put("password", password)

        val df = sqlContext.read.jdbc(url, "cities", properties)
      df.show()
        df.registerTempTable("city_info")        //将表注册进数据库中
    }


    /**
      * 基于运营产品等提交的参数,过滤出session信息
      * @param params {"startAge":[20], "endAge":[50], "startDate":["2018-08-13"], "endDate":["2018-08-13"]}
      * @param sqlContext
      *
      */
    def getRangeSession(params:JSONObject, sqlContext: SQLContext) = {
        val startDate = ParamUtils.getParam(params, Constants.PARAM_START_DATE)
        val endDate = ParamUtils.getParam(params, Constants.PARAM_END_DATE)
        val sql = "SELECT " +
            "click_product_id, " +
            "city_id " +
            "FROM user_visit_action " +
            "WHERE `date` >= '" + startDate + "' " +
            "AND `date` <= '" + endDate + "' " +
            "AND click_product_id is not null";
        val df = sqlContext.sql(sql)
        df.registerTempTable("product_click_city")
     df.show()
    }
}

在这个spark作业中,由于spark不支持row窗口函数,sparksql无法运行,则需要切换成hivesql来运行
需要编写以下sparkutil类

package com.aura.bigdata.analysis.utils

import com.aura.bigdata.analysis.constants.DeployMode
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object SparkUtils {


    def setMaster(conf:SparkConf, dMode:DeployMode) {
        if(dMode.equals(DeployMode.DEV)) {
            conf.setMaster("local[4]")
        }
    }
    def getSQLContext(sc: SparkContext, dMode: DeployMode, isHive:Boolean):SQLContext = {
        if(dMode.equals(DeployMode.DEV)) {
            if(isHive) {
                new HiveContext(sc)
            } else {
                new SQLContext(sc)
            }
        } else {
            new HiveContext(sc)
        }
    }
}

上一篇:2021-06-18


下一篇:springboot 整合tk.mapper