文章目录
计算逻辑
spark sql实现代码
package com.dmp.report import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object AreaAnalysRpt { def main(args: Array[String]): Unit = { // 0 校验参数个数 if (args.length != 1) { println( """ |cn.dmp.report.AreaAnalyseRpt |参数: | logInputPath """.stripMargin) sys.exit() } // 1 接受程序参数 val Array(logInputPath) = args // 2 创建sparkconf->sparkContext val sparkConf = new SparkConf() sparkConf.setAppName(s"${this.getClass.getSimpleName}") sparkConf.setMaster("local[*]") // RDD 序列化到磁盘 worker与worker之间的数据传输 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) val sQLContext = new SQLContext(sc) // 读取parquet文件 val parquetData: DataFrame = sQLContext.read.parquet(logInputPath) // dataframe -> table parquetData.registerTempTable("log") //业务逻辑 sQLContext.sql( """ |select |provincename, cityname, |sum(case when requestmode=1 and processnode >=2 then 1 else 0 end) 有效请求, |sum(case when requestmode=1 and processnode =3 then 1 else 0 end) 广告请求, |sum(case when iseffective=1 and isbilling=1 and isbid=1 and adorderid !=0 then 1 else 0 end) 参与竞价数, |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1 else 0 end) 竞价成功数, |sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) 展示数, |sum(case when requestmode=3 and iseffective=1 then 1 else 0 end) 点击数, |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1.0*adpayment/1000 else 0 end) 广告成本, |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1.0*winprice/1000 else 0 end) 广告消费 |from log |group by provincename, cityname """.stripMargin ).show() } }
spark core实现代码
处理计算指标的逻辑类
package cn.dmp.utils object RptUtils { /** * List(原始请求,有效请求,广告请求) */ def caculateReq(reqMode: Int, prcNode: Int): List[Double] = { if (reqMode == 1 && prcNode == 1) { List[Double](1, 0, 0) } else if (reqMode == 1 && prcNode == 2) { List[Double](1, 1, 0) } else if (reqMode == 1 && prcNode == 3) { List[Double](1, 1, 1) } else List[Double](0, 0, 0) } /** * List(参与竞价,竞价成功,消费,成本) */ def caculateRtb(effTive: Int, bill: Int, bid: Int, orderId: Int, win: Int, winPrice: Double, adPayMent: Double): List[Double] = { if (effTive == 1 && bill == 1 && bid == 1 && orderId != 0) { List[Double](1, 0, 0, 0) } else if (effTive == 1 && bill == 1 && win == 1) { List[Double](0, 1, winPrice / 1000.0, adPayMent / 1000.0) } else List[Double](0, 0, 0, 0) } /** * List(广告展示,点击) */ def caculateShowClick(reqMode: Int, effTive: Int): List[Double] = { if (reqMode == 2 && effTive == 1) { List[Double](1, 0) } else if (reqMode == 3 && effTive == 1) { List[Double](0, 1) } else List[Double](0, 0) } }
读取的普通日志文件
package com.dmp.report import com.dmp.beans.Log import com.dmp.utils.RptUtils import org.apache.spark.{SparkConf, SparkContext} /* F:\牛牛学堂大数据24期\09-实训实战-9天\dmp&&移动项目\dmp\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2 C:\Users\admin\Desktop\result3 * */ object AreaAnalysRpt2 { def main(args: Array[String]): Unit = { // 0 校验参数个数 if (args.length != 2) { println( """ |cn.dmp.report.AreaAnalyseRpt |参数: | logInputPath | resultOutputPath """.stripMargin) sys.exit() } // 1 接受程序参数 val Array(logInputPath, resultOutputPath) = args // 2 创建sparkconf->sparkContext val sparkConf = new SparkConf() sparkConf.setAppName(s"${this.getClass.getSimpleName}") sparkConf.setMaster("local[*]") // RDD 序列化到磁盘 worker与worker之间的数据传输 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) sc.textFile(logInputPath) .map(_.split(",", -1)) .filter(_.length >= 85) .map(arr => { val log = Log(arr) val req = RptUtils.caculateReq(log.requestmode, log.processnode) val rtb = RptUtils.caculateRtb(log.iseffective, log.isbilling, log.isbid, log.adorderid, log.iswin, log.winprice, log.adpayment) val showClick = RptUtils.caculateShowClick(log.requestmode, log.iseffective) // (省,地市,媒体,渠道,操作系统,网络类型,...,List(9个指标数据)) ((log.provincename, log.cityname), req ++ rtb ++ showClick) }).reduceByKey((list1, list2) => { //将两个list拉链组合在一个,相同位置的元素组成一个tuple,map操作进行累加 list1.zip(list2).map(t => t._1 + t._2) })// 会打印一个(),使用map将数据进行整理 .map(t => t._1._1 + ","+t._1._2+","+t._2.mkString) .saveAsTextFile(resultOutputPath) } }
读取parquet文件
package com.dmp.report import com.dmp.utils.RptUtils import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /* C:\Users\admin\Desktop\result1\ C:\Users\admin\Desktop\result3 * */ object AreaAnalysRpt3 { def main(args: Array[String]): Unit = { // 0 校验参数个数 if (args.length != 2) { println( """ |cn.dmp.report.AreaAnalyseRpt |参数: | logInputPath | resultOutputPath """.stripMargin) sys.exit() } // 1 接受程序参数 val Array(logInputPath, resultOutputPath) = args // 2 创建sparkconf->sparkContext val sparkConf = new SparkConf() sparkConf.setAppName(s"${this.getClass.getSimpleName}") sparkConf.setMaster("local[*]") // RDD 序列化到磁盘 worker与worker之间的数据传输 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) val sQLContext = new SQLContext(sc) val parquetData = sQLContext.read.parquet(logInputPath) parquetData.map(row => { //从row中获取具体字段 // 是不是原始请求,有效请求,广告请求 List(原始请求,有效请求,广告请求) val reqMode = row.getAs[Int]("requestmode") val prcNode = row.getAs[Int]("processnode") // 参与竞价, 竞价成功 List(参与竞价,竞价成功, 消费, 成本) val effTive = row.getAs[Int]("iseffective") val bill = row.getAs[Int]("isbilling") val bid = row.getAs[Int]("isbid") val orderId = row.getAs[Int]("adorderid") val win = row.getAs[Int]("iswin") val winPrice = row.getAs[Double]("winprice") val adPayMent = row.getAs[Double]("adpayment") // 转换为list val reqList = RptUtils.caculateReq(reqMode, prcNode) val rtbList = RptUtils.caculateRtb(effTive, bill, bid, orderId, win, winPrice, adPayMent) val showClickList = RptUtils.caculateShowClick(reqMode, effTive) //返回元组 // 返回元组 ((row.getAs[String]("provincename"), row.getAs[String]("cityname")), reqList ++ rtbList ++ showClickList) }).reduceByKey((list1, list2) => { list1.zip(list2).map(t => t._1 + t._2) }).map(t => t._1._1 + "," + t._1._2 + "," +t._2.mkString(",")) .saveAsTextFile(resultOutputPath) sc.stop() } }