Spark数据分析及处理
项目所需文件:项目文件
提取码:3yuo
使用Spark完成日志分析项目需求
- 日志数据清洗
- 用户留存分析
- 活跃用户分析
- 活跃用户地域信息分析
- 用户浏览深度分析
数据清洗
读入日志文件并转化为RDD[Row]类型
- 按照Tab切割数据
- 过滤掉字段数量少于8个的
对数据进行清洗
- 按照第一列和第二列对数据进行去重
- 过滤掉状态码非200
- 过滤掉event_time为空的数据
- 将url按照“&”以及“=”切割
保存数据
- 将数据写入mysql表中
日志拆分字段:
event_time
url
method
status
sip
user_uip
action_prepend
action_client
代码展示
package nj.zb.kb09.project
import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object DataClean2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("DataClean2").getOrCreate()
import spark.implicits._
val sc: SparkContext = spark.sparkContext
val linesRdd: RDD[String] = sc.textFile("in/test.log")
println(linesRdd.count())
//运行结果
19163
//按照Tab切割数据 过滤掉字段数量少于8个的
val line1: RDD[Array[String]] = linesRdd.map(x=>x.split("\t"))
val rdd: RDD[Row] = line1.filter(x=>x.length==8).map(x=>Row(x(0).trim,x(1).trim,x(2).trim,x(3).trim,x(4).trim,x(5).trim,x(6).trim,x(7).trim))
val schema1 = StructType(Array(
StructField("event_time", StringType, false),
StructField("url", StringType, false),
StructField("method", StringType, false),
StructField("status", StringType, false),
StructField("sip", StringType, false),
StructField("user_uip", StringType, false),
StructField("action_prepend", StringType, false),
StructField("action_client", StringType, false)
))
//创建DataFrame
val df1: DataFrame = spark.createDataFrame(rdd,schema1)
df1.printSchema()
df1.show(10,true)
println(df1.count())
//运行结果
root
|-- event_time: string (nullable = false)
|-- url: string (nullable = false)
|-- method: string (nullable = false)
|-- status: string (nullable = false)
|-- sip: string (nullable = false)
|-- user_uip: string (nullable = false)
|-- action_prepend: string (nullable = false)
|-- action_client: string (nullable = false)
+--------------------+--------------------+------+------+---------------+---------------+--------------------+--------------------+
| event_time| url|method|status| sip| user_uip| action_prepend| action_client|
+--------------------+--------------------+------+------+---------------+---------------+--------------------+--------------------+
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 192.168.168.64| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 192.168.168.63| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200|192.168.168.124| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 112.25.60.17| 223.88.187.31| -|Mozilla/5.0 (Linu...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 192.168.168.79| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200|192.168.168.148| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 192.168.168.78| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 183.136.133.44|118.144.132.209|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 112.25.60.17| 223.88.187.31| -|Mozilla/5.0 (Linu...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 192.168.168.64| -| -|Apache-HttpClient...|
+--------------------+--------------------+------+------+---------------+---------------+--------------------+--------------------+
only showing top 10 rows
19163
//按照第一列和第二列对数据进行去重,过滤掉状态码非200 过滤掉event_time为空的数据
val ds1: Dataset[Row] = df1.dropDuplicates("event_time","url").filter(x=>x(3)=="200").filter(x=>x(0).equals("")==false)
ds1.printSchema()
ds1.show(10,true)
println(ds1.count())
//运行结果
root
|-- event_time: string (nullable = false)
|-- url: string (nullable = false)
|-- method: string (nullable = true)
|-- status: string (nullable = true)
|-- sip: string (nullable = true)
|-- user_uip: string (nullable = true)
|-- action_prepend: string (nullable = true)
|-- action_client: string (nullable = true)
+--------------------+--------------------+------+------+--------------+--------------+--------------------+--------------------+
| event_time| url|method|status| sip| user_uip| action_prepend| action_client|
+--------------------+--------------------+------+------+--------------+--------------+--------------------+--------------------+
|2018-09-04T20:27:...|http://datacenter...| GET| 200| 61.240.144.25|114.252.24.227|http://www.kgc.cn/my|Mozilla/5.0 (Wind...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200|192.168.168.63| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|http://datacenter...| GET| 200|183.136.133.44| 1.196.181.58| -|Mozilla/5.0 (Linu...|
|2018-09-04T20:28:...|http://datacenter...| GET| 200|192.168.168.64| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|http://datacenter...| GET| 200| 122.70.138.16| 223.72.51.33|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...|http://datacenter...| GET| 200|182.140.227.19|222.210.139.83|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...|http://datacenter...| GET| 200|192.168.168.71| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|http://datacenter...| GET| 200|192.168.168.79| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|http://datacenter...| GET| 200|192.168.168.64| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|http://datacenter...| GET| 200|192.168.168.26| -| -| -|
+--------------------+--------------------+------+------+--------------+--------------+--------------------+--------------------+
only showing top 10 rows
18198
// 将url按?切割,取第二个部分按照”&”以及”=”切割
val dfDetail: DataFrame = ds1.map(row => {
val urlArray: Array[String] = row.getAs[String]("url").split("\\?")
var map: Map[String, String] = Map("params" -> "null")
if (urlArray.length == 2) {
map = urlArray(1).split("&").map(x => x.split("=")).filter(_.length == 2).map(x => (x(0), x(1))).toMap
}
(row.getAs[String]("event_time"),
map.getOrElse("actionBegin", ""),
map.getOrElse("actionClient", ""),
map.getOrElse("actionEnd", ""),
map.getOrElse("actionName", ""),
map.getOrElse("actionTest", ""),
map.getOrElse("actionType", ""),
map.getOrElse("actionValue", ""),
map.getOrElse("clientType", ""),
map.getOrElse("examType", ""),
//map.getOrElse("ifEqupment", ""),
//map.getOrElse("isFromContinue", ""),
map.getOrElse("skillIdCount", ""),
map.getOrElse("skillLevel", ""),
map.getOrElse("testType", ""),
map.getOrElse("userSID", ""),
map.getOrElse("userUID", ""),
map.getOrElse("userUIP", ""),
row.getAs[String]("method"),
row.getAs[String]("status"),
row.getAs[String]("sip"),
row.getAs[String]("user_uip"),
row.getAs[String]("action_prepend"),
row.getAs[String]("action_client")
)
}).toDF()
dfDetail.printSchema()
dfDetail.show(10,true)
//运行结果
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
|-- _4: string (nullable = true)
|-- _5: string (nullable = true)
|-- _6: string (nullable = true)
|-- _7: string (nullable = true)
|-- _8: string (nullable = true)
|-- _9: string (nullable = true)
|-- _10: string (nullable = true)
|-- _11: string (nullable = true)
|-- _12: string (nullable = true)
|-- _13: string (nullable = true)
|-- _14: string (nullable = true)
|-- _15: string (nullable = true)
|-- _16: string (nullable = true)
|-- _17: string (nullable = true)
|-- _18: string (nullable = true)
|-- _19: string (nullable = true)
|-- _20: string (nullable = true)
|-- _21: string (nullable = true)
|-- _22: string (nullable = true)
+--------------------+-------------+--------------------+-------------+-------------------+---+---+------+--------+---+---+---+------+--------------------+--------+--------------+---+---+--------------+--------------+--------------------+--------------------+
| _1| _2| _3| _4| _5| _6| _7| _8| _9|_10|_11|_12| _13| _14| _15| _16|_17|_18| _19| _20| _21| _22|
+--------------------+-------------+--------------------+-------------+-------------------+---+---+------+--------+---+---+---+------+--------------------+--------+--------------+---+---+--------------+--------------+--------------------+--------------------+
|2018-09-04T20:27:...| 1536150463|Mozilla%2F5.0+%28...| 1536150463| PlayOnlineTime| 1| 3| | | | | | |ar2bfjun52keidu11...|17988434|114.252.24.227|GET|200| 61.240.144.25|114.252.24.227|http://www.kgc.cn/my|Mozilla/5.0 (Wind...|
|2018-09-04T20:27:...|1536150471528|CourseWorksBirds%...|1536150471919| RegisteredExam| 0| 11| 83320| 008|001| | | |973ECBB62CBFE9750...| 83320| 223.104.63.96|GET|200|192.168.168.63| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...| | Android| | MinimizeAPP| 1| 3| | | | | | | | 0| |GET|200|183.136.133.44| 1.196.181.58| -|Mozilla/5.0 (Linu...|
|2018-09-04T20:28:...|1536150484708|Mozilla%2F5.0+%28...|1536150484734| startExam| 0| 3|144595|001_bdqn|001| 0| 0|jineng|7B7A3D3FA311BEAA2...| 144595|106.121.76.156|GET|200|192.168.168.64| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...| 1536145703|Mozilla%2F5.0+%28...| 1536145703|userChangeCdnRecord| 1| 3| | | | | | |av64dp3nidnjvs7bl...|18038246| 223.72.51.33|GET|200| 122.70.138.16| 223.72.51.33|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...| 1536150495|Mozilla%2F5.0+%28...| 1536150495| PlayOnlineTime| 1| 3| | | | | | |qup024h84nvllvbkt...|18163838|222.210.139.83|GET|200|182.140.227.19|222.210.139.83|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...|1536150497261|Mozilla%2F5.0+%28...|1536150497286| viewPaperDetail| 0| 3|259987|001_bdqn|001| 0| | zice|592125B153D2CE55F...| 259987|110.52.245.229|GET|200|192.168.168.71| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|1536150509459|Mozilla%2F5.0+%28...|1536150509490| viewReport| 0| 3|240255|001_bdqn|001| 0| | zice|59DA847CA601F31F2...| 240255|118.250.181.43|GET|200|192.168.168.79| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|1536150529933|CourseWorksBirds%...|1536150530088| answerQuestion| 0| 3|267336| 008|001| 0| | |36D80743EB20A394F...| 267336| 124.64.16.245|GET|200|192.168.168.64| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...| 1536150533|Mozilla%2F5.0+%28...| 1536150533| DragLearn| 0| 3| Pass| | | | | |45fsqun8n6bs8naa5...| 8224620| 112.25.60.16|GET|200|192.168.168.26| -| -| -|
+--------------------+-------------+--------------------+-------------+-------------------+---+---+------+--------+---+---+---+------+--------------------+--------+--------------+---+---+--------------+--------------+--------------------+--------------------+
only showing top 10 rows
val detailRDD: RDD[Row] = dfDetail.rdd
val schema2 = StructType(Array(
StructField("event_time", StringType, false),
StructField("actionBegin", StringType, false),
StructField("actionClient", StringType, false),
StructField("actionEnd", StringType, false),
StructField("actionName", StringType, false),
StructField("actionTest", StringType, false),
StructField("actionType", StringType, false),
StructField("actionValue", StringType, false),
StructField("clientType", StringType, false),
StructField("examType", StringType, false),
StructField("skillIdCount", StringType, false),
StructField("skillLevel", StringType, false),
StructField("testType", StringType, false),
StructField("userSID", StringType, false),
StructField("userUID", StringType, false),
StructField("userUIP", StringType, false),
StructField("method", StringType, false),
StructField("status", StringType, false),
StructField("sip", StringType, false),
StructField("user_uip", StringType, false),
StructField("action_prepend", StringType, false),
StructField("action_client", StringType, false)
))
val detailDF: DataFrame = spark.createDataFrame(detailRDD,schema2)
detailDF.printSchema()
detailDF.show(10,true)
//运行结果
root
|-- event_time: string (nullable = false)
|-- actionBegin: string (nullable = false)
|-- actionClient: string (nullable = false)
|-- actionEnd: string (nullable = false)
|-- actionName: string (nullable = false)
|-- actionTest: string (nullable = false)
|-- actionType: string (nullable = false)
|-- actionValue: string (nullable = false)
|-- clientType: string (nullable = false)
|-- examType: string (nullable = false)
|-- skillIdCount: string (nullable = false)
|-- skillLevel: string (nullable = false)
|-- testType: string (nullable = false)
|-- userSID: string (nullable = false)
|-- userUID: string (nullable = false)
|-- userUIP: string (nullable = false)
|-- method: string (nullable = false)
|-- status: string (nullable = false)
|-- sip: string (nullable = false)
|-- user_uip: string (nullable = false)
|-- action_prepend: string (nullable = false)
|-- action_client: string (nullable = false)
+--------------------+-------------+--------------------+-------------+-------------------+----------+----------+-----------+----------+--------+------------+----------+--------+--------------------+--------+--------------+------+------+--------------+--------------+--------------------+--------------------+
| event_time| actionBegin| actionClient| actionEnd| actionName|actionTest|actionType|actionValue|clientType|examType|skillIdCount|skillLevel|testType| userSID| userUID| userUIP|method|status| sip| user_uip| action_prepend| action_client|
+--------------------+-------------+--------------------+-------------+-------------------+----------+----------+-----------+----------+--------+------------+----------+--------+--------------------+--------+--------------+------+------+--------------+--------------+--------------------+--------------------+
|2018-09-04T20:27:...| 1536150463|Mozilla%2F5.0+%28...| 1536150463| PlayOnlineTime| 1| 3| | | | | | |ar2bfjun52keidu11...|17988434|114.252.24.227| GET| 200| 61.240.144.25|114.252.24.227|http://www.kgc.cn/my|Mozilla/5.0 (Wind...|
|2018-09-04T20:27:...|1536150471528|CourseWorksBirds%...|1536150471919| RegisteredExam| 0| 11| 83320| 008| 001| | | |973ECBB62CBFE9750...| 83320| 223.104.63.96| GET| 200|192.168.168.63| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...| | Android| | MinimizeAPP| 1| 3| | | | | | | | 0| | GET| 200|183.136.133.44| 1.196.181.58| -|Mozilla/5.0 (Linu...|
|2018-09-04T20:28:...|1536150484708|Mozilla%2F5.0+%28...|1536150484734| startExam| 0| 3| 144595| 001_bdqn| 001| 0| 0| jineng|7B7A3D3FA311BEAA2...| 144595|106.121.76.156| GET| 200|192.168.168.64| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...| 1536145703|Mozilla%2F5.0+%28...| 1536145703|userChangeCdnRecord| 1| 3| | | | | | |av64dp3nidnjvs7bl...|18038246| 223.72.51.33| GET| 200| 122.70.138.16| 223.72.51.33|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...| 1536150495|Mozilla%2F5.0+%28...| 1536150495| PlayOnlineTime| 1| 3| | | | | | |qup024h84nvllvbkt...|18163838|222.210.139.83| GET| 200|182.140.227.19|222.210.139.83|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...|1536150497261|Mozilla%2F5.0+%28...|1536150497286| viewPaperDetail| 0| 3| 259987| 001_bdqn| 001| 0| | zice|592125B153D2CE55F...| 259987|110.52.245.229| GET| 200|192.168.168.71| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|1536150509459|Mozilla%2F5.0+%28...|1536150509490| viewReport| 0| 3| 240255| 001_bdqn| 001| 0| | zice|59DA847CA601F31F2...| 240255|118.250.181.43| GET| 200|192.168.168.79| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|1536150529933|CourseWorksBirds%...|1536150530088| answerQuestion| 0| 3| 267336| 008| 001| 0| | |36D80743EB20A394F...| 267336| 124.64.16.245| GET| 200|192.168.168.64| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...| 1536150533|Mozilla%2F5.0+%28...| 1536150533| DragLearn| 0| 3| Pass| | | | | |45fsqun8n6bs8naa5...| 8224620| 112.25.60.16| GET| 200|192.168.168.26| -| -| -|
+--------------------+-------------+--------------------+-------------+-------------------+----------+----------+-----------+----------+--------+------------+----------+--------+--------------------+--------+--------------+------+------+--------------+--------------+--------------------+--------------------+
only showing top 10 rows
//连接至MySQL
val url="jdbc:mysql://hadoop100:3306/kb09test"
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
println("开始写入MySQL")
//overwrite覆盖,append追加
detailDF.write.mode("overwrite").jdbc(url,"logdetail",prop)
dfDetail.write.mode("overwrite").jdbc(url,"logorg",prop)
println("写入MySQL结束")
//运行结果
开始写入MySQL
写入MySQL结束
}
}
结果展示:
用户留存分析
计算用户的次日留存率
- 求当天新增用户总数n
- 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m(次日留存数)
- m/n*100%
计算用户的次周留存率
代码展示
- 次日留存率
package nj.zb.kb09.project
import java.text.SimpleDateFormat
import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}
object UserAnalysis {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("UserAnalysis").getOrCreate()
import spark.implicits._
val url="jdbc:mysql://hadoop100:3306/kb09test"
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
//运行结果
root
|-- event_time: string (nullable = false)
|-- actionBegin: string (nullable = false)
|-- actionClient: string (nullable = false)
|-- actionEnd: string (nullable = false)
|-- actionName: string (nullable = false)
|-- actionTest: string (nullable = false)
|-- actionType: string (nullable = false)
|-- actionValue: string (nullable = false)
|-- clientType: string (nullable = false)
|-- examType: string (nullable = false)
|-- skillIdCount: string (nullable = false)
|-- skillLevel: string (nullable = false)
|-- testType: string (nullable = false)
|-- userSID: string (nullable = false)
|-- userUID: string (nullable = false)
|-- userUIP: string (nullable = false)
|-- method: string (nullable = false)
|-- status: string (nullable = false)
|-- sip: string (nullable = false)
|-- user_uip: string (nullable = false)
|-- action_prepend: string (nullable = false)
|-- action_client: string (nullable = false)
+--------------------+-------------+--------------------+-------------+-------------------+----------+----------+-----------+----------+--------+------------+----------+--------+--------------------+--------+---------------+------+------+---------------+---------------+--------------------+--------------------+
| event_time| actionBegin| actionClient| actionEnd| actionName|actionTest|actionType|actionValue|clientType|examType|skillIdCount|skillLevel|testType| userSID| userUID| userUIP|method|status| sip| user_uip| action_prepend| action_client|
+--------------------+-------------+--------------------+-------------+-------------------+----------+----------+-----------+----------+--------+------------+----------+--------+--------------------+--------+---------------+------+------+---------------+---------------+--------------------+--------------------+
|2018-09-04T20:27:...| 1536150453|Mozilla%2F5.0+%28...| 1536150453|UseCourseNavigation| 1| 10| | | | | | |6dr3677dqeg7blcu3...| 0| 192.168.168.74| GET| 200| 192.168.168.26| -| -| -|
|2018-09-04T20:27:...|1536150463494|Mozilla%2F5.0+%28...|1536150463536| core_downXml| 0| 3|Java6SSM008| | | | | |ff68d62c8e774babb...| 812694| 192.168.168.14| GET| 200|192.168.168.124| -| -|Apache-HttpClient...|
|2018-09-04T20:27:...|1536150473398|Mozilla%2F5.0+%28...|1536150473428| answerQuestion| 0| 3| 130963| 001_bdqn| 001| 0| | |4BF4F671BD7335643...| 130963| 110.52.249.112| GET| 200| 192.168.168.79| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|1536150491097| |1536150491161| Registered| 0| 1| passport| | | | | |73d4c5581f5940798...| 786315| 59.42.74.244| GET| 200|192.168.168.124| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|1536150493692|Mozilla%2F5.0+%28...|1536150493765| answerQuestion| 0| 3| 138725| 001_bdqn| 001| 0| | |4879E2CDF4156B52B...| 138725| 117.136.89.252| GET| 200| 192.168.168.71| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...| 1536150502|Mozilla%2F5.0+%28...| 1536150502| PlayOnlineTime| 1| 3| | | | | | |k85i8hisaqqgta621...|18147434| 223.72.39.181| GET| 200| 122.70.138.16| 223.72.39.84|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...| 1536150499|Mozilla%2F5.0+%28...| 1536150499| PlayOnlineTime| 1| 3| | | | | | |js4fj8lghtnliinh6...|18060338|202.109.166.181| GET| 200| 183.136.133.30|202.109.166.183|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
|2018-09-04T20:28:...|1536150503842|Mozilla%2F5.0+%28...|1536150504093| collectQuestion| 0| 3| 255983| 001_kgc| 001| 0| | |0E34A79861A209040...| 255983| 183.129.69.158| GET| 200| 192.168.168.78| -| -|Apache-HttpClient...|
|2018-09-04T20:28:...|1536150508399|Mozilla%2F5.0+%28...|1536150508793| RegisteredExam| 0| 11| 149710| 009| 001| | | |16F9C9932FBC4307C...| 149710| 222.137.2.164| GET| 200| 192.168.168.78| -| -|Apache-HttpClient...|
|2018-09-04T20:29:...| 1536150543|Mozilla%2F5.0+%28...| 1536150543| PlayNumber| 1| 3| | | | | | |n81pqln8dakjn0sa5...|18194453| 58.244.197.168| GET| 200| 221.204.14.28| 58.244.197.168|http://www.kgc.cn...|Mozilla/5.0 (Wind...|
+--------------------+-------------+--------------------+-------------+-------------------+----------+----------+-----------+----------+--------+------------+----------+--------+--------------------+--------+---------------+------+------+---------------+---------------+--------------------+--------------------+
//读取MySQL数据
val detailDF: DataFrame = spark.read.jdbc(url,"logdetail",prop)
detailDF.printSchema()
detailDF.show(10,true)
import spark.implicits._
val sc: SparkContext = spark.sparkContext
//创建一个转时间戳的自定义函数
val changTimeFun: UserDefinedFunction = spark.udf.register("changTime", (x: String) => {
val time: Long = new SimpleDateFormat("yyyy-MM-dd").parse(x.substring(1, 10)).getTime
time
})
//所有的用户注册信息DF(userUID、register_time、zhucexignwe)
val registDF: DataFrame = detailDF.filter(detailDF("actionName")==="Registered").select("userUID","event_time","actionName").withColumnRenamed("event_time","register_time").withColumnRenamed("userUID","regUID")
registDF.printSchema()
registDF.show(10)
println(registDF.count())
//运行结果
root
|-- regUID: string (nullable = false)
|-- register_time: string (nullable = false)
|-- actionName: string (nullable = false)
+--------+--------------------+----------+
| regUID| register_time|actionName|
+--------+--------------------+----------+
| 786315|2018-09-04T20:28:...|Registered|
| 206224|2018-09-04T20:29:...|Registered|
| 850369|2018-09-04T20:29:...|Registered|
| 750400|2018-09-04T20:27:...|Registered|
| 161307|2018-09-04T20:27:...|Registered|
| 115204|2018-09-04T20:28:...|Registered|
| 853589|2018-09-04T20:28:...|Registered|
| 127677|2018-09-04T20:28:...|Registered|
| 127677|2018-09-04T20:28:...|Registered|
|18020156|2018-09-04T20:27:...|Registered|
+--------+--------------------+----------+
only showing top 10 rows
537
//把时间转换成时间戳的形式,并对注册的用户去重
val registDF2: DataFrame = registDF.select($"regUID",changTimeFun($"register_time").as("register_date"),$"actionName").distinct()
registDF2.printSchema()
registDF2.show(10)
println("注册用户数量"+registDF2.count())
//运行结果
root
|-- regUID: string (nullable = false)
|-- register_date: long (nullable = true)
|-- actionName: string (nullable = false)
+------+---------------+----------+
|regUID| register_date|actionName|
+------+---------------+----------+
|220893|-61578086400000|Registered|
|782584|-61578086400000|Registered|
|822866|-61578086400000|Registered|
|844642|-61578086400000|Registered|
|851460|-61578086400000|Registered|
|266716|-61578086400000|Registered|
|267154|-61578086400000|Registered|
|843121|-61578086400000|Registered|
|822870|-61578086400000|Registered|
|808387|-61578086400000|Registered|
+------+---------------+----------+
only showing top 10 rows
注册用户数量381
//所有的登录用户信息DF(userUID、signin_time、登录行为)
val signinDF: DataFrame = detailDF.filter(detailDF("actionName")==="Signin").select("userUID","event_time","actionName").withColumnRenamed("event_time","signin_time").withColumnRenamed("userUID","sigUID")
signinDF.printSchema()
signinDF.show(10)
println(signinDF.count())
//运行结果
root
|-- sigUID: string (nullable = false)
|-- signin_time: string (nullable = false)
|-- actionName: string (nullable = false)
+------+--------------------+----------+
|sigUID| signin_time|actionName|
+------+--------------------+----------+
|272754|2018-09-05T20:27:...| Signin|
|786315|2018-09-05T20:28:...| Signin|
|807623|2018-09-05T20:28:...| Signin|
|771442|2018-09-05T20:28:...| Signin|
|127677|2018-09-05T20:28:...| Signin|
| 81680|2018-09-05T20:27:...| Signin|
|258711|2018-09-05T20:30:...| Signin|
| 83320|2018-09-05T20:27:...| Signin|
|822870|2018-09-05T20:28:...| Signin|
|268372|2018-09-05T20:29:...| Signin|
+------+--------------------+----------+
only showing top 10 rows
492
//对登录用户去重,并把时间转为时间戳形式
val signinDF2: DataFrame = signinDF.select($"sigUID",changTimeFun($"signin_time").as("signin_date"),$"actionName").distinct()
signinDF2.printSchema()
signinDF2.show(10)
println("登录用户数量"+signinDF2.count())
//运行结果
root
|-- sigUID: string (nullable = false)
|-- signin_date: long (nullable = true)
|-- actionName: string (nullable = false)
+------+---------------+----------+
|sigUID| signin_date|actionName|
+------+---------------+----------+
|247409|-61578000000000| Signin|
|777367|-61578000000000| Signin|
|849970|-61578000000000| Signin|
|796559|-61578000000000| Signin|
|239222|-61578000000000| Signin|
|269302|-61578000000000| Signin|
|195972|-61578000000000| Signin|
|790387|-61578000000000| Signin|
|853613|-61578000000000| Signin|
|750400|-61578000000000| Signin|
+------+---------------+----------+
only showing top 10 rows
登录用户数量355
//根据UID把两张表关联起来
val joinDF: DataFrame = registDF2.join(signinDF2,signinDF2("sigUID")===registDF2("regUID"))
joinDF.printSchema()
joinDF.show()
println(joinDF.count())
//运行结果
root
|-- regUID: string (nullable = false)
|-- register_date: long (nullable = true)
|-- actionName: string (nullable = false)
|-- sigUID: string (nullable = false)
|-- signin_date: long (nullable = true)
|-- actionName: string (nullable = false)
+--------+---------------+----------+--------+---------------+----------+
| regUID| register_date|actionName| sigUID| signin_date|actionName|
+--------+---------------+----------+--------+---------------+----------+
| 147754|-61578086400000|Registered| 147754|-61578000000000| Signin|
|18040025|-61578086400000|Registered|18040025|-61578000000000| Signin|
| 200002|-61578086400000|Registered| 200002|-61578000000000| Signin|
| 813944|-61578086400000|Registered| 813944|-61578000000000| Signin|
| 817716|-61578086400000|Registered| 817716|-61578000000000| Signin|
| 127111|-61578086400000|Registered| 127111|-61578000000000| Signin|
| 270402|-61578086400000|Registered| 270402|-61578000000000| Signin|
| 789570|-61578086400000|Registered| 789570|-61578000000000| Signin|
| 822072|-61578086400000|Registered| 822072|-61578000000000| Signin|
| 82604|-61578086400000|Registered| 82604|-61578000000000| Signin|
| 267118|-61578086400000|Registered| 267118|-61578000000000| Signin|
| 796236|-61578086400000|Registered| 796236|-61578000000000| Signin|
| 848331|-61578086400000|Registered| 848331|-61578000000000| Signin|
| 801641|-61578086400000|Registered| 801641|-61578000000000| Signin|
| 160278|-61578086400000|Registered| 160278|-61578000000000| Signin|
| 269302|-61578086400000|Registered| 269302|-61578000000000| Signin|
| 784468|-61578086400000|Registered| 784468|-61578000000000| Signin|
| 854159|-61578086400000|Registered| 854159|-61578000000000| Signin|
| 252345|-61578086400000|Registered| 252345|-61578000000000| Signin|
| 857050|-61578086400000|Registered| 857050|-61578000000000| Signin|
+--------+---------------+----------+--------+---------------+----------+
only showing top 20 rows
355
val joinDF2: DataFrame = joinDF.select(joinDF("regUID"),joinDF("signin_date"),joinDF("register_date"))
joinDF2.printSchema()
joinDF2.show(10)
//运行结果
root
|-- regUID: string (nullable = false)
|-- signin_date: long (nullable = true)
|-- register_date: long (nullable = true)
+--------+---------------+---------------+
| regUID| signin_date| register_date|
+--------+---------------+---------------+
| 147754|-61578000000000|-61578086400000|
|18040025|-61578000000000|-61578086400000|
| 200002|-61578000000000|-61578086400000|
| 813944|-61578000000000|-61578086400000|
| 817716|-61578000000000|-61578086400000|
| 127111|-61578000000000|-61578086400000|
| 270402|-61578000000000|-61578086400000|
| 789570|-61578000000000|-61578086400000|
| 822072|-61578000000000|-61578086400000|
| 82604|-61578000000000|-61578086400000|
+--------+---------------+---------------+
//登录
val frame1: DataFrame = joinDF2.filter(joinDF2("register_date")===joinDF2("signin_date")-86400000).groupBy("register_date").count().withColumnRenamed("count","sigcount")
frame1.printSchema()
frame1.show(10)
//运行结果
root
|-- register_date: long (nullable = true)
|-- sigcount: long (nullable = false)
+---------------+--------+
| register_date|sigcount|
+---------------+--------+
|-61578086400000| 355|
+---------------+--------+
//注册
val frame2: DataFrame = registDF2.groupBy("register_date").count().withColumnRenamed("count","regcount")
frame2.printSchema()
frame2.show(10)
//运行结果
root
|-- register_date: long (nullable = true)
|-- regcount: long (nullable = false)
+---------------+--------+
| register_date|regcount|
+---------------+--------+
|-61578086400000| 381|
+---------------+--------+
//把注册表和登录表根据date关联起来
val frame3: DataFrame = frame1.join(frame2,"register_date")
frame3.printSchema()
frame3.show(10)
//运行结果
root
|-- register_date: long (nullable = true)
|-- sigcount: long (nullable = false)
|-- regcount: long (nullable = false)
+---------------+--------+--------+
| register_date|sigcount|regcount|
+---------------+--------+--------+
|-61578086400000| 355| 381|
+---------------+--------+--------+
//计算一日留存率
frame3.map(x=>(
x.getAs[Long]("register_date"),
x.getAs[Long]("sigcount"),
x.getAs[Long]("regcount"),
x.getAs[Long]("sigcount").toDouble/
x.getAs[Long]("regcount").toDouble
)).show()
//运行结果
+---------------+---+---+-----------------+
| _1| _2| _3| _4|
+---------------+---+---+-----------------+
|-61578086400000|355|381|0.931758530183727|
+---------------+---+---+-----------------+
}
}
活跃用户分析
统计分析需求
- 读取数据库,统计每天的活跃用户数
- 统计规则:有看课和买课行为的用户才属于活跃用户
- 对UID进行去重
活跃用户地域信息分析
统计分析需求
- 读取原始日志数据
- 解析url获取用户的访问IP
- 通过IP库获得IP对应的省市区地址(UDF)
- 求出每个地域人数的所占百分比
用户浏览深度分析
统计分析需求
-
读取日志信息,以天为计量单位,通过depth值来表示用户的浏览深度
-
统计每个depth阶段的用户的个数,反映出每个url的访问人数,针对性的优化页面,来提高网站的转化率,对用户产生粘性
-
计算规则:当前url的个数作为depth的值
一个用户今天浏览了三个页面 一个url今天被50个人访问