数据:
1001,2020-09-10 10:21:21,home.html
1001,2020-09-10 10:28:10,good_list.html
1002,2020-09-10 09:40:00,home.html
1001,2020-09-10 10:35:05,good_detail.html
1002,2020-09-10 09:42:00,favor.html
1001,2020-09-10 10:42:55,cart.html
1001,2020-09-10 10:43:55,11.html
1001,2020-09-10 10:44:55,22.html
1001,2020-09-10 10:45:55,33.html
1001,2020-09-10 10:46:55,44.html
1001,2020-09-10 10:47:55,55.html
1001,2020-09-10 10:48:55,66.html
1001,2020-09-10 10:49:55,77.html
1002,2020-09-10 09:41:00,mine.html
1001,2020-09-10 11:35:21,home.html
1001,2020-09-10 11:36:10,cart.html
1003,2020-09-10 13:10:00,home.html
1001,2020-09-10 11:38:12,trade.html
1001,2020-09-10 11:39:12,aa.html
1001,2020-09-10 11:40:12,bb.html
1001,2020-09-10 11:41:12,cc.html
1001,2020-09-10 11:42:12,dd.html
1001,2020-09-10 11:43:12,ee.html
1001,2020-09-10 11:44:12,ff.html
1001,2020-09-10 11:45:12,gg.html
1001,2020-09-10 11:46:12,hh.html
1001,2020-09-10 11:47:12,ll.html
1001,2020-09-10 11:38:55,payment.html
1003,2020-09-10 13:15:00,search.html
需求:求得用户每次会话的行为轨迹--解决数据倾斜
import java.text.SimpleDateFormat
import java.util
import java.util.UUID
import org.apache.spark.{Partition, RangePartitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
object SessionTest2 {
def main(args: Array[String]): Unit = {
//需求:求得用户每次会话的行为轨迹--解决数据倾斜
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
import spark.implicits._
//1、读取数据
val ds = spark.read.csv("datas/session2.txt").toDF("user_id", "page_time", "page").as[(String, String, String)]
//获取一个集合累加器
val acc = spark.sparkContext.collectionAccumulator[(String, UserAnalysis)]("acc")
//2、转换数据类型--样例类(转成样例类方便修改值)
val ds2: Dataset[(String, UserAnalysis)] = ds.map {
case (userid, timestr, page) =>
//获取时间戳
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val time = formatter.parse(timestr).getTime
//获得user对象
val user = UserAnalysis(userid, time, timestr, page)
//返回kv键值对--k为user_id和time或timestr拼接起来的字符串,方便后续分组排序
(s"${userid}_${time}", user)
}
//3、转成rdd
val rdd = ds2.rdd
//4、用RangePartitioner进行重分区--得到的结果在每个分区内是有序的,在分区间也是有序的
val rdd2: RDD[(String, UserAnalysis)] = rdd.repartitionAndSortWithinPartitions(new RangePartitioner(5, rdd))
//repartitionAndSortWithinPartitions():根据给定的分区器对RDD进行重新分区,并在每个结果分区中根据key进行排序。
// def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] ={...}
//class RangePartitioner[K : Ordering : ClassTag, V](partitions: Int,rdd: RDD[_ <: Product2[K, V]],...)extends Partitioner {...}
//RangePartitioner分区规则:首先对rdd采样出 分区数-1 个key,通过这些key确定 分区数个 边界,
// 这些边界就是每个分区的边界,后续key会与每个分区的边界对比,如果在范围内,则数据放入该分区
//5、对每个分区中每个用户的两两数据进行判断,看是否属于同一个会话
val rdd3 = rdd2.mapPartitionsWithIndex((index, it) => {
val list = it.toList
//当前list中的数据:
//List(
//(1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)),
//(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)),
//(1001_1599705305000,UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,618b4ff1-63f7-499c-9757-d7091ad8aa47,1)),
//(1001_1599705775000,UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,0571dccb-0904-49b9-ab31-2e0418b44c34,1)),
//(1001_1599705835000,UserAnalysis(1001,1599705835000,2020-09-10 10:43:55,11.html,7a7e5d8f-a04e-44ec-8180-8f521cb4b3bd,1)),
//(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,e21f5d1f-5105-4ab4-a2b0-8068192d45d4,1)))
//...
// 滑窗
val slidingList: Iterator[List[(String, UserAnalysis)]] = list.sliding(2)
//slidingList中的数据:
//Iterator(
// List( (1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)) ,(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)) )
// ...
// )
slidingList.foreach(x => {
//取出窗口中的第一个数据中的对象
val before = x.head._2
//取出窗口中的第二个数据中的对象
val next = x.last._2
//判断--如果是同一个用户,并且时间在半小时内就属于同一个会话
if (next.userid == before.userid && next.time - before.time <= 30 * 60 * 1000) {
//修改session和step
next.session = before.session
next.step = before.step + 1
}
})
//此时分区内有轨迹顺序正确,但是分区间的轨迹还有问题--通过累加器解决
// 使用集合累加器记录每个分区的第一条数据和最后一条数据
// 目的是用第一条数据和上一个分区的最后一条数据比较,判断是否为同一会话,如果是就修改成相同的session,并且这个分区的第一条数据的step+1
val head = list.head //第一条数据
val last = list.last //最后一条数据
//将本分区第一条与最后一条数据放入累加器--通过action算子触发
acc.add((s"${index}#head", head._2))
acc.add((s"${index}#last", last._2))
list.iterator
})
//后续rdd3可能会在多个job中使用,所以缓存一下
rdd3.cache()
rdd3.collect()
//获取累加器结果--获取的结果是java的List,不能toMap,所以需要导入以下内容,将java的集合转成scala的集合,也可将scala的集合转成java的集合
import scala.collection.JavaConverters._
val userMap = acc.value.asScala.toMap
//根据分区号遍历--0号分区不用处理,因为0号分区里的都是同一个会话
for (i <- 1 until (userMap.size / 2)) {
//获取前一个分区的最后一条数据
val beforePartitonLast = userMap.get(s"${i - 1}#last").get
//获取当前分区的第一条数据
val currentPartitionHead = userMap.get(s"${i}#head").get
//获取当前分区的最后一条数据
val currentPartitionLast = userMap.get(s"${i}#last").get
//判断当前分区第一条与前一个分区的最后一条数据是否是同一个session,如果是则同步修改session和step
if (currentPartitionHead.userid == beforePartitonLast.userid && currentPartitionHead.time - beforePartitonLast.time <= 30 * 60 * 1000) {
//注意同一个会话可能跨分区,所以要先判断当前分区的最后一条数据和当前分区的第一条数据是否为同一session,如果是则同步修改session和step
if (currentPartitionLast.session == currentPartitionHead.session) {
currentPartitionLast.session = currentPartitionHead.session
currentPartitionLast.step = beforePartitonLast.step + currentPartitionLast.step
}
currentPartitionHead.session = beforePartitonLast.session
currentPartitionHead.step = beforePartitonLast.step + 1
//0号分区
//...
//(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,2826f01e-1294-429b-90e4-877439fc345a,6))
//1号分区
//(1001_1599705955000,UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,5865170d-bb4c-439a-b0aa-f0a10d307bc7,1))
//此处的session应该等于0号分区的最后一条数据的session,step=0号分区Last.step+1=7
//...
//(1001_1599708921000,UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7c60b41c-0b06-4bfe-ad80-29fc75ce1d24,1))
//2号分区
//(1001_1599708970000,UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,1))
//此处的session应该等于1号分区的最后一条数据的session,step=1号分区Last.step+1=2
//...
//(1001_1599709272000,UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,6))
//该分区的最后一条数据和第一条数据是同一个会话,所以此处的session也应该等于1号分区的最后一条数据的session,step=1号分区Last.step+当前分区的Last.step=7
//3号分区
//(1001_1599709332000,UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,8c793999-5d8c-469c-b6c7-689d592e9713,1))
//此处的session应该等于2号分区的最后一条数据的session,step=1号分区Last.step+1=8
//...
//(1001_1599709632000,UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,8c793999-5d8c-469c-b6c7-689d592e9713,6))
//该分区的最后一条数据和第一条数据是同一个会话,所以此处的session也应该等于2号分区的最后一条数据的session,step=2号分区Last.step+当前分区的Last.step=13
}
}
//userMap的结果:
//1号分区: (UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,1))
//1号分区: (UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,81e0175d-7422-41e1-a20b-54e447bbb809,7))
//...
//2号分区:(UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,2))
//2号分区: (UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,7))
//...
//3号分区: (UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,8))
//3号分区:(UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,225b0961-1603-4e31-a84a-454d0025e79c,13))
//此时得到的结果中,每个分区内的第一条和最后一条的session和step已经修正,但分区内的其他数据的还未修正
//根据修复过的每个分区的第一条和最后一条数据修复分区内所有数据--将userMap广播出去
//广播变量
val bc = spark.sparkContext.broadcast(userMap)
val rdd4 = rdd3.mapPartitionsWithIndex((index, it) => {
val list = it.toList
//获取当前分区第一条数据
val currentPartitionUser = list.head._2
//从广播变量中取出修复过的当前分区的第一条数据
val repairCurrentPartitionUser = bc.value.get(s"${index}#head").get
//取出当前分区原来数据的session
val oldSession = currentPartitionUser.session
//广播变量中的数据是被修复过的,所以如果修复过的数据不等于当前分区原来的数据,就说明确实被修复过了
if (repairCurrentPartitionUser.session != currentPartitionUser.session) {
//过滤出还没有被修复的数据
list.filter(x => x._2.session == oldSession).foreach(x => {
x._2.session = repairCurrentPartitionUser.session
x._2.step = repairCurrentPartitionUser.step + x._2.step - 1
})
}
list.foreach(x => println(s"index = ${index} ${x._2}"))
list.iterator
})
rdd4.collect()
}
}
//定义样例类
case class UserAnalysis(userid: String, time: Long, timestr: String, page: String, var session: String = UUID.randomUUID().toString, var step: Int = 1)