over窗口函数的应用参见我上一篇博客:https://www.cnblogs.com/wanpi/p/14969000.html
rows between函数:
- SQL语句中的rows between unbounded preceding and unbounded following ,其中:
- unbounded preceding:表示Long.MIN_VALUE,也就是可视当前行之前的所有数据
- unbounded following:表示Long.MAX_VALUE,也就是可视当前行之后的所有数据
- current row:表示当前行,也就是0
下面是几个案例,帮助理解
需求1
A表里面有三条记录,字段是
ID start_time end_time
2018-02-03 2019-02-03
2019-02-04 2020-03-04
2018-08-04 2019-03-04
根据已知的三条记录用SQL写出结果为:
2018-02-03 2018-08-04
2018-08-04 2019-02-03
2019-02-03 2019-02-04
2019-02-04 2019-03-04
2019-03-04 2020-03-04
解决思路
1.拆解时间数据
2.升序排列日期
3.窗口函数
代码
package method
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
object OnWindowFunction3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("windowfunction").master("local[*]").getOrCreate()
val rdd = spark.sparkContext.makeRDD(List(
(1,"2018-02-03","2019-02-03"),
(2,"2019-02-04","2020-03-04"),
(3,"2018-08-04","2019-03-04")
))
import spark.implicits._
val df = rdd.flatMap(t3 => {
Array(t3._2,t3._3)
}).toDF("value")
import org.apache.spark.sql.functions._
val w1 = Window
.orderBy($"value" asc)
.rowsBetween(0,1)
df
.withColumn("end_time",max("value") over(w1))
.show()
spark.stop()
}
}
//结果
+----------+----------+
| value| end_time|
+----------+----------+
|2018-02-03|2018-08-04|
|2018-08-04|2019-02-03|
|2019-02-03|2019-02-04|
|2019-02-04|2019-03-04|
|2019-03-04|2020-03-04|
|2020-03-04|2020-03-04|
+----------+----------+
需求2
统计网站访问时长。每个用户访问总时长
数据集
findsiteduration.csv
uid,date,dur
111,2019-06-20,1
111,2019-06-21,2
111,2019-06-22,3
222,2019-06-20,4
222,2019-06-21,5
222,2019-06-22,6
333,2019-06-20,7
333,2019-06-21,8
333,2019-06-22,9
444,2019-06-23,10
代码
package sparksql
import org.apache.spark.sql.SparkSession
object FindSiteDuration {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
val df = spark.read.option("header",true).csv(".\\resources\\findsiteduration.csv")
df.createTempView("temp1")
spark
.sql(
"""
|select uid,date,dur,
|sum(dur) over(partition by uid order by date) as totaldur
|from temp1
|""".stripMargin).show()
spark.stop()
}
}
//结果
+---+----------+---+--------+
|uid| date|dur|totaldur|
+---+----------+---+--------+
|111|2019-06-20| 1| 1.0|
|111|2019-06-21| 2| 3.0|
|111|2019-06-22| 3| 6.0|
|444|2019-06-23| 10| 10.0|
|222|2019-06-20| 4| 4.0|
|222|2019-06-21| 5| 9.0|
|222|2019-06-22| 6| 15.0|
|333|2019-06-20| 7| 7.0|
|333|2019-06-21| 8| 15.0|
|333|2019-06-22| 9| 24.0|
+---+----------+---+--------+
//每个用户访问当天和前一天两天访问时长
spark
.sql(
"""
|select uid,date,dur,
|sum(dur) over(partition by uid order by date rows between 1 preceding and current row) as totaldur
|from temp1
|""".stripMargin).show()
//结果
+---+----------+---+--------+
|uid| date|dur|totaldur|
+---+----------+---+--------+
|111|2019-06-20| 1| 1.0|
|111|2019-06-21| 2| 3.0|
|111|2019-06-22| 3| 5.0|
|444|2019-06-23| 10| 10.0|
|222|2019-06-20| 4| 4.0|
|222|2019-06-21| 5| 9.0|
|222|2019-06-22| 6| 11.0|
|333|2019-06-20| 7| 7.0|
|333|2019-06-21| 8| 15.0|
|333|2019-06-22| 9| 17.0|
+---+----------+---+--------+
//每个用户当天和前一天,后一天三天的网站访问时长
spark
.sql(
"""
|select uid,date,dur,
|sum(dur) over(partition by uid order by date rows between 1 preceding and 1 following) as totaldur
|from temp1
|""".stripMargin).show()
//结果
+---+----------+---+--------+
|uid| date|dur|totaldur|
+---+----------+---+--------+
|111|2019-06-20| 1| 3.0|
|111|2019-06-21| 2| 6.0|
|111|2019-06-22| 3| 5.0|
|444|2019-06-23| 10| 10.0|
|222|2019-06-20| 4| 9.0|
|222|2019-06-21| 5| 15.0|
|222|2019-06-22| 6| 11.0|
|333|2019-06-20| 7| 15.0|
|333|2019-06-21| 8| 24.0|
|333|2019-06-22| 9| 17.0|
+---+----------+---+--------+