FlinkSql中窗口(window)的使用
目录
时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了。
在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows
一、Table API中使用窗口
Group Windows
分组窗口(Group Windows)
会根据时间或行计数间隔,将行聚合到有限的组(Group)
中,并对每个组的数据执行一次聚合函数
。
Table API中的Group Windows都是使用.window(w:GroupWindow)
子句定义的,并且必须由as子句指定一个别名
。为了按窗口对表进行分组,窗口的别名必须在group by
子句中,像常规的分组字段一样引用。
- 滚动窗口
public class Flink08_TableApi_Window_1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
table
.window(Tumble.over(lit(10).second()).on($("ts")).as("w")) // 定义滚动窗口并给窗口起一个别名
.groupBy($("id"), $("w")) // 窗口必须出现的分组字段中
.select($("id"), $("w").start(), $("w").end(), $("vc").sum())
.execute()
.print();
env.execute();
}
}
- 滑动窗口
.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("w"))
- 会话窗口
.window(Session.withGap(lit(6).second()).on($("ts")).as("w")
Over Windows
Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。
Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
*的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。
- Unbounded Over Windows
public class Flink09_TableApi_OverWindow_1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
table
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w"))
.select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc"))
.execute()
.print();
env.execute();
}
}
# 使用UNBOUNDED_RANGE
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w"))
说明:
- Bounded Over Windows
// 当事件时间向前算3s得到一个窗口
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w"))
// 当行向前推算2行算一个窗口
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(2L)).as("w"))
二、SQL API中使用窗口
Group Windows
SQL 查询的分组窗口是通过 GROUP BY
子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果
。以下是批处理表和流处理表支持的分组窗口函数:
分组窗口函数 | 描述 |
---|---|
TUMBLE(time_attr, interval) | 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
HOP(time_attr, interval, interval) | 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
SESSION(time_attr, interval) | 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql("create table sensor(" +
"id string," +
"ts bigint," +
"vc int, " +
"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
"watermark for t as t - interval '5' second)" +
"with("
+ "'connector' = 'filesystem',"
+ "'path' = 'input/sensor.txt',"
+ "'format' = 'csv'"
+ ")");
tEnv
.sqlQuery(
"SELECT id, " +
" TUMBLE_START(t, INTERVAL '1' minute) as wStart, " +
" TUMBLE_END(t, INTERVAL '1' minute) as wEnd, " +
" SUM(vc) sum_vc " +
"FROM sensor " +
"GROUP BY TUMBLE(t, INTERVAL '1' minute), id"
)
.execute()
.print();
tEnv
.sqlQuery(
"SELECT id, " +
" hop_start(t, INTERVAL '1' minute, INTERVAL '1' hour) as wStart, " +
" hop_end(t, INTERVAL '1' minute, INTERVAL '1' hour) as wEnd, " +
" SUM(vc) sum_vc " +
"FROM sensor " +
"GROUP BY hop(t, INTERVAL '1' minute, INTERVAL '1' hour), id"
)
.execute()
.print();
Over Windows
tEnv
.sqlQuery(
"select " +
"id," +
"vc," +
"sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)" +
"from sensor"
)
.execute()
.print();
tEnv
.sqlQuery(
"select " +
"id," +
"vc," +
"count(vc) over w, " +
"sum(vc) over w " +
"from sensor " +
"window w as (partition by id order by t rows between 1 PRECEDING and current row)"
)
.execute()
.print();