sparksql和spark core 实现地域报表分析


文章目录


计算逻辑

sparksql和spark core 实现地域报表分析

sparksql和spark core 实现地域报表分析
sparksql和spark core 实现地域报表分析

spark sql实现代码

package com.dmp.report

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object AreaAnalysRpt {

  def main(args: Array[String]): Unit = {

    // 0 校验参数个数
    if (args.length != 1) {
      println(
        """
          |cn.dmp.report.AreaAnalyseRpt
          |参数:
          | logInputPath
        """.stripMargin)
      sys.exit()
    }

    // 1 接受程序参数
    val Array(logInputPath) = args

    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sc = new SparkContext(sparkConf)
    val sQLContext = new SQLContext(sc)

    // 读取parquet文件
    val parquetData: DataFrame = sQLContext.read.parquet(logInputPath)
    // dataframe -> table
    parquetData.registerTempTable("log")
    //业务逻辑
    sQLContext.sql(
      """
        |select
        |provincename, cityname,
        |sum(case when requestmode=1 and processnode >=2 then 1 else 0 end) 有效请求,
        |sum(case when requestmode=1 and processnode =3 then 1 else 0 end) 广告请求,
        |sum(case when iseffective=1 and isbilling=1 and isbid=1 and adorderid !=0 then 1 else 0 end) 参与竞价数,
        |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1 else 0 end) 竞价成功数,
        |sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) 展示数,
        |sum(case when requestmode=3 and iseffective=1 then 1 else 0 end) 点击数,
        |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1.0*adpayment/1000 else 0 end) 广告成本,
        |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1.0*winprice/1000 else 0 end) 广告消费
        |from log
        |group by provincename, cityname
      """.stripMargin
    ).show()

  }

}

spark core实现代码

处理计算指标的逻辑类

package cn.dmp.utils

object RptUtils {

    /**
      * List(原始请求,有效请求,广告请求)
      */
    def caculateReq(reqMode: Int, prcNode: Int): List[Double] = {
        if (reqMode == 1 && prcNode == 1) {
            List[Double](1, 0, 0)
        } else if (reqMode == 1 && prcNode == 2) {
            List[Double](1, 1, 0)
        } else if (reqMode == 1 && prcNode == 3) {
            List[Double](1, 1, 1)
        } else List[Double](0, 0, 0)
    }

    /**
      * List(参与竞价,竞价成功,消费,成本)
      */
    def caculateRtb(effTive: Int, bill: Int, bid: Int, orderId: Int, win: Int, winPrice: Double, adPayMent: Double): List[Double] = {

        if (effTive == 1 && bill == 1 && bid == 1 && orderId != 0) {
            List[Double](1, 0, 0, 0)
        } else if (effTive == 1 && bill == 1 && win == 1) {
            List[Double](0, 1, winPrice / 1000.0, adPayMent / 1000.0)
        } else List[Double](0, 0, 0, 0)
    }
    /**
      * List(广告展示,点击)
      */
    def caculateShowClick(reqMode: Int, effTive: Int): List[Double] = {
        if (reqMode == 2 && effTive == 1) {
            List[Double](1, 0)
        } else if (reqMode == 3 && effTive == 1) {
            List[Double](0, 1)
        } else List[Double](0, 0)
    }
}

读取的普通日志文件

package com.dmp.report

import com.dmp.beans.Log
import com.dmp.utils.RptUtils
import org.apache.spark.{SparkConf, SparkContext}

/*
  F:\牛牛学堂大数据24期\09-实训实战-9天\dmp&&移动项目\dmp\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2
  C:\Users\admin\Desktop\result3
  *
  */
object AreaAnalysRpt2 {

  def main(args: Array[String]): Unit = {
    // 0 校验参数个数
    if (args.length != 2) {
      println(
        """
          |cn.dmp.report.AreaAnalyseRpt
          |参数:
          | logInputPath
          | resultOutputPath
        """.stripMargin)
      sys.exit()
    }

    // 1 接受程序参数
    val Array(logInputPath, resultOutputPath) = args

    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sc = new SparkContext(sparkConf)
    sc.textFile(logInputPath)
      .map(_.split(",", -1))
      .filter(_.length >= 85)
      .map(arr => {
        val log = Log(arr)
        val req = RptUtils.caculateReq(log.requestmode, log.processnode)
        val rtb = RptUtils.caculateRtb(log.iseffective, log.isbilling, log.isbid, log.adorderid, log.iswin, log.winprice, log.adpayment)
        val showClick = RptUtils.caculateShowClick(log.requestmode, log.iseffective)
        // (省,地市,媒体,渠道,操作系统,网络类型,...,List(9个指标数据))
        ((log.provincename, log.cityname), req ++ rtb ++ showClick)
      }).reduceByKey((list1, list2) => {
      //将两个list拉链组合在一个,相同位置的元素组成一个tuple,map操作进行累加
      list1.zip(list2).map(t => t._1 + t._2)
    })// 会打印一个(),使用map将数据进行整理
      .map(t => t._1._1 + ","+t._1._2+","+t._2.mkString)
      .saveAsTextFile(resultOutputPath)
  }
}

读取parquet文件

package com.dmp.report

import com.dmp.utils.RptUtils
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/*
 C:\Users\admin\Desktop\result1\
  C:\Users\admin\Desktop\result3
  *
  */
object AreaAnalysRpt3 {

  def main(args: Array[String]): Unit = {
    // 0 校验参数个数
    if (args.length != 2) {
      println(
        """
          |cn.dmp.report.AreaAnalyseRpt
          |参数:
          | logInputPath
          | resultOutputPath
        """.stripMargin)
      sys.exit()
    }

    // 1 接受程序参数
    val Array(logInputPath, resultOutputPath) = args

    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sc = new SparkContext(sparkConf)
    val sQLContext = new SQLContext(sc)
    val parquetData = sQLContext.read.parquet(logInputPath)
    parquetData.map(row => {
      //从row中获取具体字段
      // 是不是原始请求,有效请求,广告请求 List(原始请求,有效请求,广告请求)
      val reqMode = row.getAs[Int]("requestmode")
      val prcNode = row.getAs[Int]("processnode")
      // 参与竞价, 竞价成功  List(参与竞价,竞价成功, 消费, 成本)
      val effTive = row.getAs[Int]("iseffective")
      val bill = row.getAs[Int]("isbilling")
      val bid = row.getAs[Int]("isbid")
      val orderId = row.getAs[Int]("adorderid")
      val win = row.getAs[Int]("iswin")
      val winPrice = row.getAs[Double]("winprice")
      val adPayMent = row.getAs[Double]("adpayment")
      //      转换为list
      val reqList = RptUtils.caculateReq(reqMode, prcNode)
      val rtbList = RptUtils.caculateRtb(effTive, bill, bid, orderId, win, winPrice, adPayMent)
      val showClickList = RptUtils.caculateShowClick(reqMode, effTive)
      //返回元组
      // 返回元组
      ((row.getAs[String]("provincename"), row.getAs[String]("cityname")), reqList ++ rtbList ++ showClickList)
    }).reduceByKey((list1, list2) => {
      list1.zip(list2).map(t => t._1 + t._2)
    }).map(t => t._1._1 + "," + t._1._2 + "," +t._2.mkString(","))
      .saveAsTextFile(resultOutputPath)
    sc.stop()
  }
}

               

上一篇:sparksql读取hive表中数据


下一篇:SparkSQL读写JDBC