大家好,我是邵奈一,一个不务正业的程序猿、正儿八经的斜杠青年。
1、世人称我为:被代码耽误的诗人、没天赋的书法家、五音不全的歌手、专业跑龙套演员、不合格的运动员…
2、这几年,我整理了很多IT技术相关的教程给大家,主要是大数据教程,帮助了很多小伙伴入坑大数据行业。
3、如果您觉得文章有用,请收藏,转发,评论,并关注我,谢谢!
博客导航跳转(请收藏):邵奈一的技术博客导航
| 公众号 | 微信 | 微博 | CSDN | 简书 |
教程目录
0x00 教程内容- 运行模式配置化
- 路径配置化
- 输出类型配置化
注意:以下代码均在 SessionCutETL
中修改。
目前我们是在本地运行的,如果是放到集群运行,则需要修改相应的代码,如果在本地测试,又要加上之前的代码,非常不方便。于是,我们可以给运行的模式加上参数适配。
(1)给设置运行模式加上判断条件:
conf.setMaster("local")
//修改为:
if(!conf.contains("spark.master")) {
conf.setMaster("local")
}
0x02 路径配置化
目前我们的路径都是写死在代码里的,我们应该进行参数化处理,如果有传参数则使用传参的值,如果没有,则使用默认值。
(1)添加以下四行代码,并且需要修改代码里的路径:
// 通过配置拿到我们配置的输入和输出路径
val visitLogsInputPath = conf.get("spark.sessioncut.visitLogsInputPath", "data/rawdata/visit_log.txt")
val cookieLabelInputPath = conf.get("spark.sessioncut.cookieLabelInputPath", "data/cookie_label.txt")
val baseOutputPath = conf.get("spark.sessioncut.baseOutputPath", "data/output")
0x03 输出类型配置化
目前我们是直接把输出类型在代码中写死的,需要改进。
(1)添加一行代码,并修改代码里的字符串:
val outputFileType = if (args.nonEmpty) args(0) else "text"
// 修改输出代码为:
// text & parquet
OutputComponent.fromOutputFileType(outputFileType).writeOutputData(sc,baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)
完整代码如下:
(2)SessionCutETL
的完整代码如下:
package com.shaonaiyi.session
import com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Auther: shaonaiyi@163.com
* @Date: 2019/9/12 10:09
* @Description: 会话切割的程序主入口
*/
object SessionCutETL {
private val logTypeSet = Set("pageview", "click")
def main(args: Array[String]): Unit = {
var conf = new SparkConf()
conf.setAppName("SessionCutETL")
if(!conf.contains("spark.master")) {
conf.setMaster("local")
}
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
var sc = new SparkContext(conf)
// 通过配置拿到我们配置的输入和输出路径
val visitLogsInputPath = conf.get("spark.sessioncut.visitLogsInputPath", "data/rawdata/visit_log.txt")
val cookieLabelInputPath = conf.get("spark.sessioncut.cookieLabelInputPath", "data/cookie_label.txt")
val baseOutputPath = conf.get("spark.sessioncut.baseOutputPath", "data/output")
val outputFileType = if (args.nonEmpty) args(0) else "text"
//网站域名标签数据,此处只是演示,其实可以存放在数据库里
val domainLabelMap = Map(
"www.baidu.com" -> "level1",
"www.taobao.com" -> "level2",
"jd.com" -> "level3",
"youku.com" -> "level4"
)
//广播
val domainLabelMapB = sc.broadcast(domainLabelMap)
// 1、加载日志源数据
val rawRDD: RDD[String] = sc.textFile(visitLogsInputPath)
//2、解析rawRDD中每一行日志源数据
val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
.filter(trackerLog => logTypeSet.contains(trackerLog.getLogType.toString))
//缓存parsedLogRDD
parsedLogRDD.persist(StorageLevel.MEMORY_AND_DISK)
//3、按照cookie进行分组,一个cookie及一个user
val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)
//4、生成会话RDD
val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>
//处理每个user的日志
val processor = new OneUserTrackerLogsProcessor(iter.toArray)
// val processor = new OneUserTrackerLogsProcessor(iter.toArray) with PageviewSessionGenerator
processor.buildSessions(domainLabelMapB.value)
}
//5、给会话的cookie打标签
val cookieLabelRDD: RDD[(String, String)] = sc.textFile(cookieLabelInputPath).map { case line =>
val temp = line.split("\\|")
(temp(0), temp(1)) // (cookie, cookie_label)
}
val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)
val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {
case (cookie, (session, cookieLabelOpt)) =>
if (cookieLabelOpt.nonEmpty) {
session.setCookieLabel(cookieLabelOpt.get)
} else {
session.setCookieLabel("-")
}
session
}
//text & parquet
OutputComponent.fromOutputFileType(outputFileType).writeOutputData(sc,baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)
sc.stop()
}
}
0xFF 总结
- 下一篇文章我们会打包我们的代码放到集群去运行。
邵奈一 原创不易,如转载请标明出处,教育是一生的事业。