网站用户行为分析项目之会话切割(六)=> 参数配置化

大家好,我是邵奈一,一个不务正业的程序猿、正儿八经的斜杠青年。
1、世人称我为:被代码耽误的诗人、没天赋的书法家、五音不全的歌手、专业跑龙套演员、不合格的运动员…
2、这几年,我整理了很多IT技术相关的教程给大家,主要是大数据教程,帮助了很多小伙伴入坑大数据行业。
3、如果您觉得文章有用,请收藏,转发,评论,并关注我,谢谢!
博客导航跳转(请收藏):邵奈一的技术博客导航
| 公众号 | 微信 | 微博 | CSDN | 简书 |


教程目录

0x00 教程内容
  1. 运行模式配置化
  2. 路径配置化
  3. 输出类型配置化

注意:以下代码均在 SessionCutETL 中修改。

0x01 运行模式配置化

目前我们是在本地运行的,如果是放到集群运行,则需要修改相应的代码,如果在本地测试,又要加上之前的代码,非常不方便。于是,我们可以给运行的模式加上参数适配。

(1)给设置运行模式加上判断条件:

conf.setMaster("local")
//修改为:
if(!conf.contains("spark.master")) {
  conf.setMaster("local")
}
0x02 路径配置化

目前我们的路径都是写死在代码里的,我们应该进行参数化处理,如果有传参数则使用传参的值,如果没有,则使用默认值。

(1)添加以下四行代码,并且需要修改代码里的路径:

// 通过配置拿到我们配置的输入和输出路径
val visitLogsInputPath = conf.get("spark.sessioncut.visitLogsInputPath", "data/rawdata/visit_log.txt")
val cookieLabelInputPath = conf.get("spark.sessioncut.cookieLabelInputPath", "data/cookie_label.txt")
val baseOutputPath = conf.get("spark.sessioncut.baseOutputPath", "data/output")
0x03 输出类型配置化

目前我们是直接把输出类型在代码中写死的,需要改进。
(1)添加一行代码,并修改代码里的字符串:

val outputFileType = if (args.nonEmpty) args(0) else "text"
// 修改输出代码为:
// text & parquet
OutputComponent.fromOutputFileType(outputFileType).writeOutputData(sc,baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)

完整代码如下:
(2)SessionCutETL 的完整代码如下:

package com.shaonaiyi.session

import com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Auther: shaonaiyi@163.com
  * @Date: 2019/9/12 10:09
  * @Description: 会话切割的程序主入口
  */
object SessionCutETL {

  private val logTypeSet = Set("pageview", "click")

  def main(args: Array[String]): Unit = {

    var conf = new SparkConf()
    conf.setAppName("SessionCutETL")
    if(!conf.contains("spark.master")) {
      conf.setMaster("local")
    }
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    var sc = new SparkContext(conf)

    // 通过配置拿到我们配置的输入和输出路径
    val visitLogsInputPath = conf.get("spark.sessioncut.visitLogsInputPath", "data/rawdata/visit_log.txt")
    val cookieLabelInputPath = conf.get("spark.sessioncut.cookieLabelInputPath", "data/cookie_label.txt")
    val baseOutputPath = conf.get("spark.sessioncut.baseOutputPath", "data/output")
	
	val outputFileType = if (args.nonEmpty) args(0) else "text"
    
    //网站域名标签数据,此处只是演示,其实可以存放在数据库里
    val domainLabelMap = Map(
      "www.baidu.com" -> "level1",
      "www.taobao.com" -> "level2",
      "jd.com" -> "level3",
      "youku.com" -> "level4"
    )

    //广播
    val domainLabelMapB = sc.broadcast(domainLabelMap)

    // 1、加载日志源数据
    val rawRDD: RDD[String] = sc.textFile(visitLogsInputPath)

    //2、解析rawRDD中每一行日志源数据
    val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
    .filter(trackerLog => logTypeSet.contains(trackerLog.getLogType.toString))

    //缓存parsedLogRDD
    parsedLogRDD.persist(StorageLevel.MEMORY_AND_DISK)

    //3、按照cookie进行分组,一个cookie及一个user
   val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)

    //4、生成会话RDD
    val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>

      //处理每个user的日志
      val processor = new OneUserTrackerLogsProcessor(iter.toArray)
//      val processor = new OneUserTrackerLogsProcessor(iter.toArray) with PageviewSessionGenerator

        processor.buildSessions(domainLabelMapB.value)
    }

    //5、给会话的cookie打标签
    val cookieLabelRDD: RDD[(String, String)] = sc.textFile(cookieLabelInputPath).map { case line =>
      val temp = line.split("\\|")
      (temp(0), temp(1)) // (cookie, cookie_label)
    }

    val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)

    val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {
      case (cookie, (session, cookieLabelOpt)) =>
        if (cookieLabelOpt.nonEmpty) {
          session.setCookieLabel(cookieLabelOpt.get)
        } else {
          session.setCookieLabel("-")
        }
        session
    }

    //text & parquet
    OutputComponent.fromOutputFileType(outputFileType).writeOutputData(sc,baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)

    sc.stop()
  }

}
0xFF 总结
  1. 下一篇文章我们会打包我们的代码放到集群去运行。

邵奈一 原创不易,如转载请标明出处,教育是一生的事业。


上一篇:一个流和百亿级的表的join


下一篇:大数据应用技术实验