文章目录
0x00 文章内容- 实现输出代码的重构
- 校验结果
1. 抽离输出代码
a. 因为SessionCutETL
里的main
方法写了比较多的代码,此时我们可以将第6步骤的输出代码进行抽离,全选,选中Refactor
=>Extract
=>Method
b. 我们这里选择第一个
c. 填写方法名,点击OK
即可发现已经抽离出了方法。
2. 重构输出路径
a. 下面的代码中路径都是写死的,而且出现了共同的路径,我们可以进行统一
val trackerLogOutputPath = "data/output/trackerLog"
val trackerSessionOutputPath = "data/output/trackerSession"
修改如下:
writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)
private def writeOutputData(sc: SparkContext, baseOutputPath: String, parsedLogRDD: RDD[TrackerLog], cookieLabeledSessionRDD: RDD[TrackerSession]) = {
val trackerLogOutputPath = s"${baseOutputPath}/trackerLog"
val trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"
我们暂且这样重构先,此时需要重新执行一下代码,看一下改后是否能执行。
d. 执行验证,执行会报错,表示路径的文件已经存在,此时可以手动删除再执行,这里下一步是在代码中实现删除。
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory data/output/trackerLog already exists
e. 添加判断路径代码
val fileSystem = FileSystem.get(sc.hadoopConfiguration)
val path = new Path(trackerLogOutputPath)
if (fileSystem.exists(path)) {
fileSystem.delete(path, true)
}
写完之后再抽离出去,取名deletePathIfExists
方法。
f. 此时再重新执行代码,发现代码没有报错,也能得到想要的结果。
3. 重构输出文件类型
目前的情况是以Parquet的形式保存着,如果此时如果需求发生变化了,或者有其他格式的需求,如保存成TextFile格式。就要重新改代码了,如果需求又变了,或者写成其他组件,又要重新改,重新然后打包,重新然后上传,这样非常麻烦。试想,如果此时将需要修改的代码抽象成一个接口,就会大大方便了。
a. 本优化内容比较多,涉及两个类,代码量及过程比较多:
b. OutputComponent
完整代码如下:
package com.shaonaiyi.session
import com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
* @Auther: shaonaiyi@163.com
* @Date: 2019/12/30 21:09
* @Description: 抽象输出文件组件
*/
trait OutputComponent {
def writeOutputData(sc: SparkContext, baseOutputPath: String,
parsedLogRDD: RDD[TrackerLog],
cookieLabeledSessionRDD: RDD[TrackerSession]) = {
deletePathIfExists(sc, baseOutputPath)
}
private def deletePathIfExists(sc: SparkContext, trackerLogOutputPath: String) = {
val fileSystem = FileSystem.get(sc.hadoopConfiguration)
val path = new Path(trackerLogOutputPath)
if (fileSystem.exists(path)) {
fileSystem.delete(path, true)
}
}
}
object OutputComponent {
def fromOutputFileType(fileType: String) = {
if (fileType.equals("parquet")) {
new ParquetFileOutput
} else {
new TextFileOutput
}
}
}
/**
* 写Parquet格式文件
*/
class ParquetFileOutput extends OutputComponent {
override def writeOutputData(sc: SparkContext, baseOutputPath: String,
parsedLogRDD: RDD[TrackerLog],
cookieLabeledSessionRDD: RDD[TrackerSession]): Unit = {
super.writeOutputData(sc, baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)
//6、保存数据
//6.1、保存TrackerLog,对应的是parsedLogRDD
val trackerLogOutputPath = s"${baseOutputPath}/trackerLog"
AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerLog.SCHEMA$)
parsedLogRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerLogOutputPath,
classOf[Void], classOf[TrackerLog], classOf[AvroParquetOutputFormat[TrackerLog]]
)
//6.2、保存TrackerSession,对应的是cookieLabeledSessionRDD
val trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"
AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerSession.SCHEMA$)
cookieLabeledSessionRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerSessionOutputPath,
classOf[Void], classOf[TrackerSession], classOf[AvroParquetOutputFormat[TrackerSession]]
)
}
}
/**
* 写TextFile格式文件
*/
class TextFileOutput extends OutputComponent {
override def writeOutputData(sc: SparkContext, baseOutputPath: String,
parsedLogRDD: RDD[TrackerLog],
cookieLabeledSessionRDD: RDD[TrackerSession]): Unit = {
super.writeOutputData(sc, baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)
//6、保存数据
//6.1、保存TrackerLog,对应的是parsedLogRDD
val trackerLogOutputPath = s"${baseOutputPath}/trackerLog"
parsedLogRDD.saveAsTextFile(trackerLogOutputPath)
//6.2、保存TrackerSession,对应的是cookieLabeledSessionRDD
val trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"
cookieLabeledSessionRDD.saveAsTextFile(trackerSessionOutputPath)
}
}
编写思路:先写OutputComponent
接口,然后写ParquetFileOutput
类继承OutputComponent
,最后写一个伴生类OutputComponent
以方便调用,最后进行代码优化,将deletePathIfExists
抽离出来。
代码讲解:
- 此
OutputComponent
是一个Trait类型,属于接口抽象类,目的将输出格式接口化,原本是只有一个Parquet格式的,现在再添加了一个TextFile格式,实现的功能其实就是与输出Parquet格式的代码相类似。ParquetFileOutput
与TextFileOutput
均继承此类,所以需要实现writeOutputData
方法,写好之后,需要再写一个伴生类来调用,并且判断输入的是哪种类型;除此之外,还简化了deletePathIfExists
方法,统一用super
进行了了调用。 -
saveAsNewAPIHadoopFile
需要的参数是key-value类型,所以需要转成key-value先。
c. SessionCutETL
完整代码如下:
package com.shaonaiyi.session
import com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Auther: shaonaiyi@163.com
* @Date: 2019/9/12 10:09
* @Description: 会话切割的程序主入口
*/
object SessionCutETL {
private val logTypeSet = Set("pageview", "click")
def main(args: Array[String]): Unit = {
var conf = new SparkConf()
conf.setAppName("SessionCutETL")
conf.setMaster("local")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
var sc = new SparkContext(conf)
//网站域名标签数据,此处只是演示,其实可以存放在数据库里
val domainLabelMap = Map(
"www.baidu.com" -> "level1",
"www.taobao.com" -> "level2",
"jd.com" -> "level3",
"youku.com" -> "level4"
)
//广播
val domainLabelMapB = sc.broadcast(domainLabelMap)
// sc.setLogLevel("ERROR")
// 1、加载日志源数据
val rawRDD: RDD[String] = sc.textFile("data/rawdata/visit_log.txt")
// rawRDD.collect().foreach(println)
//2、解析rawRDD中每一行日志源数据
// val parsedLogRDD: RDD[Option[TrackerLog]] = rawRDD.map( line => RawLogParserUtil.parse(line))
// val parsedLogRDD1: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
.filter(trackerLog => logTypeSet.contains(trackerLog.getLogType.toString))
// parsedLogRDD.collect().foreach(println)
// parsedLogRDD1.collect().foreach(println)
//3、按照cookie进行分组
val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)
// cookieGroupRDD.collect().foreach(println)
//4、按user进行分组
val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>
//处理每个user的日志
val processor = new OneUserTrackerLogsProcessor(iter.toArray)
processor.buildSessions(domainLabelMapB.value)
}
//5、给会话的cookie打标签
val cookieLabelRDD: RDD[(String, String)] = sc.textFile("data/cookie_label.txt").map { case line =>
val temp = line.split("\\|")
(temp(0), temp(1)) // (cookie, cookie_label)
}
val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)
val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {
case (cookie, (session, cookieLabelOpt)) =>
if (cookieLabelOpt.nonEmpty) {
session.setCookieLabel(cookieLabelOpt.get)
} else {
session.setCookieLabel("-")
}
session
}
//text & parquet
OutputComponent.fromOutputFileType("text").writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)
sc.stop()
}
}
代码讲解:删除原先的输出格式的代码,用OutputComponent
调用来实现。
1. 生成不同的输出文件类型
a. 执行SessionCutETL
类,查看文件的输出类型,此时生成了TextFile格式文件。
b. 修改此行代码的"text"为parquet
,重新执行,查看结果,此时生成了Parquet格式文件。
//text & parquet
OutputComponent.fromOutputFileType("parquet").writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)
0xFF 总结
- 代码重构是一项非常重要的技能,增强代码的可读性。
- 下一篇文章还会继续优化,请留意本博客,关注、评论、加点赞。
- 网站用户行为分析项目系列:
网站用户行为分析项目之会话切割(一)
网站用户行为分析项目之会话切割(二)
网站用户行为分析项目之会话切割(三)
网站用户行为分析项目之会话切割(四)待补充
网站用户行为分析项目之会话切割(五)待补充
作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |
福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。