FlinkSql中窗口(window)的使用

FlinkSql中窗口(window)的使用

目录

时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了。

在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows

一、Table API中使用窗口

Group Windows

分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

  • 滚动窗口
public class Flink08_TableApi_Window_1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
            .fromElements(new WaterSensor("sensor_1", 1000L, 10),
                          new WaterSensor("sensor_1", 2000L, 20),
                          new WaterSensor("sensor_2", 3000L, 30),
                          new WaterSensor("sensor_1", 4000L, 40),
                          new WaterSensor("sensor_1", 5000L, 50),
                          new WaterSensor("sensor_2", 6000L, 60))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
            );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv
            .fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

        table
            .window(Tumble.over(lit(10).second()).on($("ts")).as("w"))  // 定义滚动窗口并给窗口起一个别名
            .groupBy($("id"), $("w")) // 窗口必须出现的分组字段中
            .select($("id"), $("w").start(), $("w").end(), $("vc").sum())
            .execute()
            .print();

        env.execute();
    }
}
  • 滑动窗口
.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("w"))
  • 会话窗口
.window(Session.withGap(lit(6).second()).on($("ts")).as("w")

Over Windows

Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。

Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。

*的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。

  • Unbounded Over Windows
public class Flink09_TableApi_OverWindow_1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
            .fromElements(new WaterSensor("sensor_1", 1000L, 10),
                          new WaterSensor("sensor_1", 4000L, 40),
                          new WaterSensor("sensor_1", 2000L, 20),
                          new WaterSensor("sensor_2", 3000L, 30),
                          new WaterSensor("sensor_1", 5000L, 50),
                          new WaterSensor("sensor_2", 6000L, 60))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                    .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
            );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv
            .fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

        table
            .window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w"))
            .select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc"))
            .execute()
            .print();

        env.execute();
    }
}


# 使用UNBOUNDED_RANGE
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w"))

说明:

FlinkSql中窗口(window)的使用
FlinkSql中窗口(window)的使用
FlinkSql中窗口(window)的使用

  • Bounded Over Windows
// 当事件时间向前算3s得到一个窗口
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w"))
// 当行向前推算2行算一个窗口
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(2L)).as("w"))

二、SQL API中使用窗口

Group Windows

SQL 查询的分组窗口是通过 GROUP BY子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:

分组窗口函数 描述
TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
HOP(time_attr, interval, interval) 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
SESSION(time_attr, interval) 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

FlinkSql中窗口(window)的使用

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql("create table sensor(" +
                    "id string," +
                    "ts bigint," +
                    "vc int, " +
                    "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                    "watermark for t as t - interval '5' second)" +
                    "with("
                    + "'connector' = 'filesystem',"
                    + "'path' = 'input/sensor.txt',"
                    + "'format' = 'csv'"
                    + ")");

tEnv
    .sqlQuery(
        "SELECT id, " +
            "  TUMBLE_START(t, INTERVAL '1' minute) as wStart,  " +
            "  TUMBLE_END(t, INTERVAL '1' minute) as wEnd,  " +
            "  SUM(vc) sum_vc " +
            "FROM sensor " +
            "GROUP BY TUMBLE(t, INTERVAL '1' minute), id"
    )
    .execute()
    .print();
tEnv
    .sqlQuery(
        "SELECT id, " +
            "  hop_start(t, INTERVAL '1' minute, INTERVAL '1' hour) as wStart,  " +
            "  hop_end(t, INTERVAL '1' minute, INTERVAL '1' hour) as wEnd,  " +
            "  SUM(vc) sum_vc " +
            "FROM sensor " +
            "GROUP BY hop(t, INTERVAL '1' minute, INTERVAL '1' hour), id"
    )
    .execute()
    .print();

Over Windows

tEnv
    .sqlQuery(
        "select " +
            "id," +
            "vc," +
            "sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)" +
            "from sensor"
    )
    .execute()
    .print();

tEnv
    .sqlQuery(
        "select " +
            "id," +
            "vc," +
            "count(vc) over w, " +
            "sum(vc) over w " +
            "from sensor " +
            "window w as (partition by id order by t rows between 1 PRECEDING and current row)"
    )
    .execute()
    .print();
上一篇:论文·Neural Relation Extraction with Selective Attention over Instances


下一篇:大数据理论篇HDFS的基石——Google File System