Flink 中的窗口

Flink 中的窗口


在流式计算中,我们所接入的数据集是无限流,或者说是没有边界的数据流。那么有没有办法将无限流转换为有限流呢?这里就需要引入 Window(窗口)的概念,通过 Window 我们可以按照固定时间或长度将无限数据流切分成不同长度的有限数据块,然后在每个窗口内针对数据块进行聚合运算。

Window 分类

Keyed Window 和 Global Window
Time Window 和 Count Window
Window API

Time Window 和 Count Window
Time Window
基于时间定义的窗口。根据不同业务场景又可以分为滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)三种。

滚动窗口(Tumbing Window):

滚动窗口是按照固定时间进行切分,而且所有窗口之间的数据不会重叠,使用时只需要指定一个窗口长度即可。
Flink 中的窗口

从上面的示例图中可以看到,滚动窗口的窗口大小(window size)是固定的,而且相邻窗口之间是连续的。现在有这样的业务场:某公司要求每 10 秒统计一次最近 10 秒内各个电商平台的订单数量并输出到大屏幕,这时候就需要用到滚动窗口了,我们只需要将窗口大小设置为 10 秒就可以。我们使用 netcat 发送 Socket 数据来模拟订单流量。

在 com.xxx.window 包下创建 TumblingWindow Scala Object,代码如下:

package com.xxx.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object TumblingWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)

    import org.apache.flink.streaming.api.scala._

    val dataStream: DataStream[(String, Int)] = textDstream
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0) // 按照第0个字段分组
      .timeWindow(Time.seconds(10)) // 设定窗口大小
      .sum(1) // 对第1个字段求和

    dataStream.print().setParallelism(1)

    env.execute("Tumbling Window")
  }
}

在上面的代码中,我们监听了 localhost 的 9999 端口,将接收到的数据转换为(key, 1)这样的键值对,然后按 key 分组,设定窗口大小为 10 秒,最后针对每个分组里的数据进行求和并输出。

在终端中输入 nc -l -p 9999,然后运行程序,在终端中输入以下内容,然后观察控制台输出:

Flink 中的窗口

在终端输入的时候注意控制时间间隔,由于代码中设置的窗口大小是 10 秒,所以会每隔 10 秒打印一次最近 10 秒内输入的数据。最终输出的统计结果和上图中不一致属于正常情况。另外 timeWindow() 方法中的时间参数除了 Time.seconds(),还有 Time.days()、Time.hours()、Time.minutes() 和 Time.milliseconds(),在 idea 中输入的时候会有提示。

滑动窗口(Sliding Window):

滑动窗口有两个参数,分别是窗口大小和窗口滑动时间,它是允许不同窗口的元素重叠的(同一个元素可以出现在不同的窗口中)。窗口大小指定数据统计的时间跨度,而滑动时间指定的是相邻两个窗口时间的时间偏移量。当滑动时间小于窗口大小的时候,数据会发生重叠;当滑动窗口大于窗口大小的时候,窗口会出现不连续的情况(部分元素不会纳入统计);当滑动时间和窗口大小相等的时候,滑动窗口就是滚动窗口,从这个角度来看,滚动窗口是滑动窗口的一个特殊存在。

滑动窗口和滚动窗口在使用过程中的唯一区别就是多了一个滑动大小的参数。假设将上面的业务修改为“公司要求每隔 5 秒统计一次最近 10 秒内的各平台订单量”,那么这里的窗口大小(window size)就是 10 秒,而窗口滑动大小(window slide)是 5 秒,从上图中我们可以看到,当窗口滑动大小小于窗口大小的时候,元素是会重叠的。完整代码如下:

package com.xxx.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object SlidlingWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)

    import org.apache.flink.streaming.api.scala._

    val dataStream: DataStream[(String, Int)] = textDstream
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0) // 按照第0个字段分组
      .timeWindow(Time.seconds(10), Time.seconds(5)) // 设定窗口大小、窗口滑动大小
      .sum(1) // 对第1个字段求和

    dataStream.print().setParallelism(1)

    env.execute("Slidling Window")
  }
}

会话窗口(Session Window):

与前滚动窗口和滑动窗口不同的是,会话窗口没有固定的滑动时间和窗口大小,而是通过一个 session gap 来指定窗口间隔。如果在 session gap 规定的时间内没有活跃数据进入的话,则认为当前窗口结束,下一个窗口开始。session gap 可以理解为相邻元素的最大时间差。
Flink 中的窗口

代码如下:

package com.xxx.window

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object SessionWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)

    import org.apache.flink.streaming.api.scala._

    val dataStream: DataStream[(String, Int)] = textDstream
      .map((_, 1))
      .keyBy(0)
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 超时时长为5秒
      .sum(1)

    dataStream.print().setParallelism(1)

    env.execute("Session Window")
  }
}

在上面的代码中,我们使用 ProcessingTimeSessionWindows.withGap(Time.seconds(5)) 指定当前窗口为会话窗口,而且最大等待时间为 5 秒,也就是说如果相邻两个元素的抵达时间小于等于 5 秒,则不会触发当前窗口,一旦超过 5 秒未接收到新数据,当前窗口触发计算。大家可以使用 nc -l -p 9999 命令启动终端发送数据并观察控制台输出,注意把控相邻元素的时间间隔。

Count Window
基于输入数据量定义,与时间无关。Count Window 也可以细分为滚动窗口和滑动窗口,逻辑和 Time Window 中的滚动窗口和滑动窗口的逻辑类似,只是窗口大小和触发条件由时间换成了相同 Key 元素的数量。窗口大小是由相同 Key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。

滚动窗口代码如下:

package com.xxx.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object CountTumblingWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)

    import org.apache.flink.streaming.api.scala._

    val dataStream: DataStream[(String, Int)] = textDstream
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .countWindow(3)
      .sum(1)

    dataStream.print().setParallelism(1)

    env.execute("Count Tumbling Window")
  }
}

在上面的代码中我们设置的窗口触发条件为 3,也就是相同 Key 元素达到 3 之后就触发计算。注意是相同 Key 元素的个数而不是所有元素的总数。在终端输入nc -l -p 9999之后依次输入以下内容:

tianmao
jindong
taobao
tianmao
tianmao
taobao
taobao
jindong
jindong

在输入以上内容的时候注意观察控制台输出:

(tianmao,3)
(taobao,3)
(jindong,3)

滑动窗口代码如下:

package com.xxx.window

import org.apache.flink.streaming.api.scala._

object CountSlidlingWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)

    import org.apache.flink.streaming.api.scala._

    val dataStream: DataStream[(String, Int)] = textDstream
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .countWindow(3, 2)
      .sum(1)

    dataStream.print().setParallelism(1)

    env.execute("Sliding Tumbling Window")
  }
}

在上面的代码中,countWindow()方法有两个参数,分别是size和slide,其中size为 3,slide为 2。也就是说每收到两个相同 Key 的元素就触发一次计算,计算的范围是相邻的 3 个相同元素。

使用 nc -l -p 9999 命令启动终端并发送以下内容:

taobao
taobao
tianmao
taobao
tianmao
taobao
taobao
taobao

输出如下:

(taobao,2)
(tianmao,2)
(taobao,3)
(taobao,3)

Keyed Window 和 Global Window
Keyed Window
在前面学习聚合算子的时候我们有提到过 KeyedStream 类型。如果上游数据集的类型是 KeyedStream,则调用window()方法,数据会根据 Key 在不同的 Task 实例中分别计算,最后将得到针对每个 Key 的统计结果。

Global Window
如果上游数据集是非键值对类型的,则调用windowAll()方法,所有的数据都会在窗口算子中一个 Task 中计算,并得到全局的结果。

Keyed Window 和 Global Window 大家作为了解即可,由于使用较少,这里不做赘述。

总结
Window 是流处理中非常常用,也是非常重要的一种处理方式。其中 Time Window 可以说是重点中的重点,大家在学习的时候要认真理解示例图,搞清楚窗口大小和窗口滑动大小的关系。万变不离其宗,不论业务过程如何复杂,最终都会转化到本实验的编程模型中,唯一需要替换的就是聚合部分的业务逻辑。Keyed Window 和 Global Window 大家简单了解就好,有兴趣的同学可以自行实验。

上一篇:论文阅读记录[ Benchmarking Streaming Computation Engines: Storm, Flink and Spark Streaming ]


下一篇:Spark Streaming整合Kafka及示例