flink之核心抽象--Window窗口及窗口操作全面详解

1. Windows

1.1. 基本概念

窗口是处理无限流的核心。窗口将流划分为固定大小的“桶”,方便程序员在上面应用各种计算。

Window操作是流式数据处理的一种非常核心的抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。

1.2. 窗口分类

Window分为两类,窗口程序的通用结构详见下面,主要区别是针对Keyed Window,需要首先使用keyBy将stream转化为Keyed stream,然后使用window处理,但Non-Keyed Window直接使用windowAll处理

1.2.1. Keyed Window

- 将输入原始流stream转换成多个Keyed stream
- 每个Keyed stream会独立进行计算,这样多个Task可以对Windowing操作进行并行处理
- 具有相同Key的数据元素会被发到同一个Task中进行处理

方括号([…])中的命令是可选的。这表明Flink允许程序员以许多不同的方式定制窗口逻辑,以便它最适合实际的业务需求。

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

1.2.2. Non-Keyed Window

- 原始流stream不会被分割成多个逻辑流 
- 所有的Windowing操作逻辑只能在一个Task中进行处理,计算并行度为1

窗口程序的通用结构

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

1.3. 窗口的生命周期

针对基于时间的窗口,窗口的生命周期简而言之如下:

  • 创建窗口:当应该属于该窗口的第一个元素到达时,就会创建一个窗口,
  • 销毁窗口:当时间(事件或处理时间)经过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

Flink只保证删除基于时间的窗口,而不删除其他类型的窗口,例如全局窗口。
如使用基于事件时间的窗口策略,每5分钟创建一个非重叠(或滚动)窗口,允许延迟1分钟:
当带有时间戳的第一个元素到达这个时间间隔时,Flink将为12:00到12:05之间的时间间隔创建一个新窗口,当水印经过12:06时间戳时,它将删除该时间间隔对应的窗口。

针对窗口的逻辑操作:

  • 窗口函数 Window Functions:在窗口数据中执行的计算逻辑,可以是ReduceFunction、AggregateFunction或ProcessWindowFunction中的一个
  • 触发器 Trigger指定窗口被认为已准备好应用窗口函数的条件,如当窗口中的元素数量超过4时、当水印经过窗口的结束时间
    • 每个WindowAssigner都带有一个默认Trigger。 如果默认触发器不符合您的需求,则可以使用trigger(…)指定自定义触发器。
    • 决定在创建和删除窗口之间的任何时间清除其内容。
  • 驱逐器 Evictor: 在触发触发器之后以及在应用窗口函数之前或之后从窗口中删除元素

2. Window Assigners

Window Assigners【窗口赋值器】:定义如何将元素赋值给窗口,责将每个传入的元素分配给一个或多个窗口。

  • flink提供了四个常用的pre-defined window assigners【预定义窗口赋值器】: tumbling windows、sliding windows、session windows、global windows【注意:除global windows外,都是基于时间进行分配】
  • 可以通过扩展WindowAssigner类来实现自定义窗口赋值器

Flink中度量事件时间进度的机制是水印
水印作为数据流的一部分,并带有时间戳t。 Watermark(t)声明该流中的事件时间已经达到时间t,这意味着流中不应该再有时间戳为t’ <= t的元素(即时间戳大于或等于水印的事件)。

基于时间的窗口有一个开始时间戳和一个结束时间戳【是前闭后开区间】,它们一起描述窗口的大小

2.1. Count-based window

  • Tumbling CountWindow 代码示例: .countWindow(windowSize)
  • Sliding CountWindow 代码示例:.countWindow(windowSize, slideSize)

2.2. Time-based window

2.2.1. Tumbling Windows

滚动窗口赋值器将每个元素赋给一个固定长度大小的窗口。滚动窗口有一个固定的大小且相互不重叠
适用于BI统计(计算各个时间段的指标)。
特点:

  • 时间对齐 : 默认是aligned with epoch(整点、整分、整秒等),可以通过offset参数改变对齐方式。
  • 窗口长度固定
  • event无重叠

代码示例:

  • .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  • .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  • .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) 时间偏移8小时

offset偏移量的一个重要用例是将窗口调整为UTC-0以外的时区
flink之核心抽象--Window窗口及窗口操作全面详解

2.2.2. Sliding Windows

滑动窗口赋值器将每个元素赋值给一个或多个固定长度大小的窗口

  • window size 参数指定窗口长度大小
  • window slide 控制滑动窗口的启动频率。

使用场景:监控场景,对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

特点:

  • 时间对齐 : 默认是aligned with epoch(整点、整分、整秒等),可以通过offset参数改变对齐方式。
  • 窗口长度固定
  • event有重叠

代码示例:

  • .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  • .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  • .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))

因此当slide 小于 size时,滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。

flink之核心抽象--Window窗口及窗口操作全面详解

2.2.3. Session Windows

会话窗口分配器按活动的会话对元素进行分组。

  • 与滚动窗口和滑动窗口相比,会话窗口不重叠且没有固定的开始和结束时间
  • 当会话窗口在一段时间内没有接收到元素时,即发生不活动间隙【session gap】时,会话窗口将关闭。
  • 会话间隙 session gap
    • 固定gap,也称静态会话间隙
    • 动态gap:实现SessionWindowTimeGapExtractor,该函数定义了不活动的时间长度
  • 当这段时间【会话间隙】到期时,当前会话关闭,并将后续元素分配给一个新的会话窗口。

适用于线上用户行为分析。

注意:

  • 在内部,会话窗口分配器为每个到达的记录创建一个新窗口,如果窗口之间的距离比定义的间隙更近,则将它们合并在一起,
  • 为了可合并,会话窗口操作符需要一个合并触发器和一个合并窗口函数,例如ReduceFunction、AggregateFunction或ProcessWindowFunction

特点:

  • 时间无对齐
  • event不重叠
  • 没有固定开始和结束时间

代码示例:

  • .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  • .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
    override def extract(element: String): Long = {
    // determine and return session gap
    }
    }))
  • .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  • .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
    override def extract(element: String): Long = {
    // determine and return session gap
    }
    }))
    flink之核心抽象--Window窗口及窗口操作全面详解

2.2.4. Global Windows

全局窗口赋值器将具有相同键的所有元素分配给同一个全局窗口。此窗口模式仅在您指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有一个可以处理聚合元素的自然结束。

代码示例: .window(GlobalWindows.create())
flink之核心抽象--Window窗口及窗口操作全面详解
代码示例如下:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, ProcessingTimeSessionWindows, SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}
import org.apache.flink.util.Collector
object FlinkWindowProcessGlobal   {
  def main(args: Array[String]): Unit = {
    //1.创建流计算执⾏行行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream - 细化
    val text = env.socketTextStream("Centos",9999)
    //3.执⾏行行DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(word=>(word._1))
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of(4))
      .apply(new UserDefineGlobalWindowFunction)
      .print()
    //5.执⾏行行流计算任务
    env.execute("Tumbling Window Stream WordCount")
  }
}
class UserDefineGlobalWindowFunction extends WindowFunction[(String,Int),(String,Int),String,GlobalWindow]{
  override def apply(key: String,
                     window: GlobalWindow,
                     input: Iterable[(String, Int)],
                     out: Collector[(String, Int)]): Unit = {
    val sum = input.map(_._2).sum
    out.collect((s"${key}",sum))
  }
}

2.3. all windows 总结

flink之核心抽象--Window窗口及窗口操作全面详解

3. Window Functions

窗口函数封装了窗口数据的计算逻辑,当系统确定一个窗口已准备好进行处理时,将触发窗口函数

  • 窗口函数可以是ReduceFunction、AggregateFunction或ProcessWindowFunction中的一个。
  • 前两个可以更有效地执行,因为Flink可以在每个窗口的元素到达时增量地聚合它们。
  • ProcessWindowFunction获取一个窗口中包含的所有元素的Iterable,以及这些元素所属的窗口的附加元信息,在调用函数之前,在内部缓冲窗口的所有元素,所以执行效率差一点

3.1. ReduceFunction

ReduceFunction指定了来自输入的两个元素如何组合以产生相同类型的输出元素。Flink使用ReduceFunction来增量聚合窗口中的元素。

代码示例如下:

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

3.2. AggregateFunction

AggregateFunction是ReduceFunction的泛化版本,它有三种类型:

  • 输入类型(IN):输入流中元素的类型
  • 累加类型(ACC): 聚合过程中的中间结果类型
  • 输出类型(OUT): 聚合的最终结果类型
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)
    

3.3. ProcessWindowFunction

ProcessWindowFunction获得一个包含窗口所有元素的Iterable,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口函数提供更大的灵活性【这是以性能和资源消耗为代价的,因为不能增量聚合元素,而是需要在内部缓冲,直到认为窗口已经准备好进行处理。】

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

注意,将ProcessWindowFunction用于简单的聚合(如count)是非常低效的。下面的章节将展示如何将ReduceFunction或AggregateFunction与ProcessWindowFunction组合,以获得增量聚合和添加的ProcessWindowFunction信息。

3.3.1. Incremental Window Aggregation with ReduceFunction

将ReduceFunction与ProcessWindowFunction组合,以获得增量聚合和添加的ProcessWindowFunction信息

import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object FlinkWindowProcessTumblingWithProcessWindowFunctionAndReduceFunction {
  def main(args: Array[String]): Unit = {
    //1.创建流计算执⾏环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream - 细化
    val text = env.socketTextStream("172.25.21.22",19999)
    //3.执⾏行行DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(t=>t._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce(new UserDefineReduceFunction2,new UserDefineProcessWindowFunction2)
      .print()
    //5.执⾏流计算任务
    env.execute("Aggregate Window Stream WordCount")
  }
}
class UserDefineProcessWindowFunction2 extends ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
  val sdf = new SimpleDateFormat("HH:mm:ss")
  override def process(key: String,
                       context: Context,
                       elements: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
    val window = context.window//获取窗口元数据
    val start = sdf.format(window.getStart)
    val end = sdf.format(window.getEnd)
    val sum = elements.map(_._2).sum
    println("list---->"+elements.toList)

    out.collect((key+"\t["+start+"---"+end+"]",sum))
  }

}
class UserDefineReduceFunction2 extends ReduceFunction[(String,Int)]{
  override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
    println("reduce:"+t+"\t"+t1)
    (t._1,t._2+t1._2)
  }
}

3.3.2. Incremental Window Aggregation with AggregateFunction

将AggregateFunction与ProcessWindowFunction组合,以获得增量聚合和添加的ProcessWindowFunction信息

val input: DataStream[(String, Long)] = ...

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction())

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {

  def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {
    val average = averages.iterator.next()
    out.collect((key, average))
  }
}

3.4. Using per-window state in ProcessWindowFunction

除了访问键控状态(如任何丰富功能所允许的那样),a ProcessWindowFunction还可以使用键控状态,该键控状态的范围仅限于该函数当前正在处理的窗口。
在这种情况下,理解每个窗口状态所引用的窗口是什么是很重要的。有不同的“窗口”涉及:

  • 指定窗口操作时定义的窗口:这可能是1小时的翻滚窗口或2小时的滑动窗口滑动1小时。
  • 给定键的已定义窗口的实际实例:对于用户ID xyz,这可能是从12:00到13:00的时间窗口。这是基于窗口定义的,并且基于作业当前正在处理的键的数量以及事件属于哪个时隙,将有许多窗口。

每个窗口的状态与这两个中的后者相关。这意味着,如果我们处理1000个不同键的事件,并且当前所有事件的事件都落入[12:00,13:00)时间窗口,那么将有1000个窗口实例,每个实例具有各自的每个窗口状态。

在process()方法中,可以调用Context对象上的两个方法,它们允许访问两种类型的状态:

  • globalState(),它允许访问不在窗口范围内的键状态
  • windowState(),它允许访问也作用于窗口的键状态

如果您预期同一窗口会多次触发,则此功能很有用,例如,对于迟到的数据有较晚的触发,或者您有进行推测性较早触发的自定义触发器时,可能会发生这种情况。在这种情况下,您将存储有关先前触发或每个窗口状态中触发次数的信息。

使用窗口状态时,清除窗口时也要清除该状态,这一点很重要。这应该在clear()方法中发生。

import java.text.SimpleDateFormat
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object FlinkWindowProcessTumblingWithProcessWindowFunctionState {
  def main(args: Array[String]): Unit = {
    //1.创建流计算执⾏环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream - 细化
    val text = env.socketTextStream("172.25.21.22",19999)
    //3.执⾏行行DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(t=>t._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new UserDefineProcessWindowFunction3)
      .print()
    //5.执⾏流计算任务
    env.execute("Aggregate Window Stream WordCount")
  }
}
class UserDefineProcessWindowFunction3 extends ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
  val sdf = new SimpleDateFormat("HH:mm:ss")
  var wvsd:ValueStateDescriptor[Int]=_
  var gvsd:ValueStateDescriptor[Int]=_

  override def open(parameters: Configuration): Unit = {
    wvsd=new ValueStateDescriptor[Int]("ws",createTypeInformation[Int])
    gvsd=new ValueStateDescriptor[Int]("gs",createTypeInformation[Int])
  }
  override def process(key: String,
                       context: Context,
                       elements: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
    val window = context.window   //获取窗口元数据
    val start = sdf.format(window.getStart)
    val end = sdf.format(window.getEnd)
    val sum = elements.map(_._2).sum

    var wvs:ValueState[Int]=context.windowState.getState(wvsd)
    var gvs:ValueState[Int]=context.globalState.getState(gvsd)
    wvs.update(wvs.value()+sum)
    gvs.update(gvs.value()+sum)
    println(key+"\tWindow Count\t"+wvs.value()+"\tGlobal Count\t"+gvs.value())
    out.collect((key+"\t["+start+"---"+end+"]",sum))
  }
}

3.5 WindowFunction (Legacy)

在一些可以使用ProcessWindowFunction的地方,你也可以使用WindowFunction。这是ProcessWindowFunction的旧版本,它提供较少的上下文信息,并且没有一些高级特性,比如每个窗口的键状态。这个接口在某个时候将被弃用。

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object FlinkWindowProcessSessionWithWindowFunction  {
  def main(args: Array[String]): Unit = {
    //1.创建流计算执⾏行行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream - 细化
    val text = env.socketTextStream("172.25.21.22",19999)
    //3.执⾏行行DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(word=>(word._1))
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
      .apply(new UserDefineWindowFunction)
      .print()
    //5.执⾏行行流计算任务
    env.execute("Tumbling Window Stream WordCount")
  }
}
class UserDefineWindowFunction extends WindowFunction[(String,Int),(String,Int),String,TimeWindow]{
  override def apply(key: String,
                     window: TimeWindow,
                     input: Iterable[(String, Int)],
                     out: Collector[(String, Int)]): Unit = {
    val format = new SimpleDateFormat("HH:mm:ss")
    val start = format.format(window.getStart)
    val end = format.format(window.getEnd)
    val sum = input.map(_._2).sum
    out.collect((s"${key}\t${start}~${end}",sum))
  }
}

4. Triggers

4.1 基本概念及操作

Trigger定义了何时开始使用窗口计算函数计算窗口。每个窗口分配器都会有一个默认的Trigger。。如果默认触发器不符合实际的业务需求,可以使用trigger(…)指定自定义触发器。

Trigger接口有五个方法允许Trigger对不同的事件做出反应:

  • onElement():对添加到窗口的每个元素都会调用该方法。
  • onEventTime():事件时间 timer 触发的时候被调用。
  • onProcessingTime():处理时间 timer 触发的时候会被调用。
  • onMerge():与有状态触发器相关,当它们对应的窗口合并时,合并两个触发器的状态,例如使用会话窗口时。
  • clear():执行移除相应窗口时所需的任何操作。

关于上述方法,有两点需要注意:

  • 前三个函数通过返回TriggerResult来决定如何处理它们的调用事件。操作可以是以下其中之一:
    • CONTINUE:什么也不做,
    • FIRE:触发计算,
    • PURGE:清除窗口中的元素
    • FIRE_AND_PURGE:触发计算并随后清除窗口中的元素。
  • 这些方法中的任何一个都可以用来为将来的操作注册处理或事件时间计时器timer。

4.2. Default Triggers of WindowAssigners

WindowAssigner的默认触发器适用于很多案例。比如所有基于事件时间的窗口分配器都用EventTimeTrigger作为默认触发器。该触发器会在watermark达到窗口的截止时间时(the watermark passes the end of a window)直接触发计算输出。

GlobalWindow的默认触发器是永不触发的NeverTrigger。因此,在使用GlobalWindow时,您总是必须定义一个自定义触发器

源码详见: org.apache.flink.streaming.api.windowing.triggers

Flink内部有一些内置的触发器:

  • EventTimeTrigger:基于事件时间和watermark机制来对窗口进行触发计算。
  • ProcessingTimeTrigger: 基于处理时间触发。
  • CountTrigger: 窗口元素数超过预先给定的限制值的话会触发计算。
  • PurgingTrigger作为其它trigger的参数,将其转化为一个purging触发器。
// 针对每个元素触发
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        // window的最大时间戳比当前watermark小,该window需要立刻进行计算
        return TriggerResult.FIRE;
    } else {
        // 注册一个event time事件,当watermark超过window.maxTimestamp时,会调用onEventTime方法
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    // 当前时间为window的最大时间戳,触发计算
    return time == window.maxTimestamp() ?
        TriggerResult.FIRE :
        TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    // 对于processing time,不做任何处理
    return TriggerResult.CONTINUE;
}

flink之核心抽象--Window窗口及窗口操作全面详解

4.3. Fire and Purge

一旦触发器确定一个窗口已准备好处理,它将触发,即返回FIRE或FIRE_AND_PURGE

这是窗口操作符发出当前窗口结果的信号。

  • 给定一个带有ProcessWindowFunction的窗口,所有元素都被传递给ProcessWindowFunction(可能是在将它们传递给驱逐器之后)
  • 带有ReduceFunction或AggregateFunction的Windows只会发出它们增量聚合的结果。

当触发器触发时,它可以是FIRE或FIRE_AND_PURGE。虽然FIRE保留窗口的内容,但FIRE_AND_PURGE删除其内容。默认情况下,预实现的触发器只是FIRE而不清除窗口状态。
注意:清除将简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。

5. Evictors

Evictors能够在触发器触发之后以及**应用窗口函数 之前 或 之后 **从窗口中删除元素。为此,Evictor 接口有两个方法:

public interface Evictor<T, W extends Window> extends Serializable {

    /**
     * Optionally evicts elements. Called before windowing function.
     *
     * @param elements The elements currently in the pane.
     * @param size The current number of elements in the pane.
     * @param window The {@link Window}
     * @param evictorContext The context for the Evictor
     */
    void evictBefore(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);

    /**
     * Optionally evicts elements. Called after windowing function.
     *
     * @param elements The elements currently in the pane.
     * @param size The current number of elements in the pane.
     * @param window The {@link Window}
     * @param evictorContext The context for the Evictor
     */
    void evictAfter(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);

Flink带有三个内置的驱逐器,默认情况下这些内置的驱逐器都在窗口函数之前应用它们的逻辑

  • CountEvictor: 在窗口中保持用户指定的元素数量,如果多于用户指定的数量,从窗口缓冲区的开头丢弃多余的元素。
  • DeltaEvictor: 接受一个DeltaFunction和一个阈值,计算窗口缓冲区中最后一个元素与每个剩余元素之间的增量,并删除增量大于或等于阈值的元素
  • TimeEvictor:接受以毫秒为单位的interval作为参数,对于给定的窗口,它会在其元素中查找最大时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。

注意:

  • 默认情况下,所有内置的驱逐器在窗口函数之前使用。指定驱逐器可以避免预聚合(pre-aggregation),因为窗口内所有元素必须在窗口计算之前传递给驱逐器。
  • Flink 不保证窗口内元素的顺序。这意味着虽然驱逐器可以从窗口开头移除元素,但这些元素不一定是先到的还是后到的。

6. Allowed Lateness

当使用事件时间窗口时,可能会发生元素延迟到达的情况,即Flink用来跟踪事件时间进展的水印已经超过了元素所属窗口的结束时间戳。

默认情况下,当水印超过窗口的结束时间戳时,将删除晚到的元素。
但是,Flink允许为窗口操作符指定最大允许延迟,允许延迟指定元素在被删除之前可以延迟多少时间,其默认值为0
如果元素在 窗口的结束时间戳 + 最大允许延迟之前到达,元素仍旧被添加到窗口。

根据所使用的触发器,延迟但未删除的元素可能会导致窗口再次触发。如EventTimeTrigger就是这样。
示例代码:
.allowedLateness(

6.1. Getting late data as a side output

使用Flink的侧输出【side output】特性,可以得到一个延迟丢弃的数据流。

可参考:Flink1.11中watermark的创建以及如何使用watermark处理乱序数据和迟到数据

public class SolveLateness {
    public static void main(String[] args) throws Exception {
        streamCompute();
    }

    private static void streamCompute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 默认是200ms
        env.getConfig().setAutoWatermarkInterval(1L);

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<SensorReading> stream = source.map(data -> new SensorReading(
                        data.split(",")[0].trim(),
                        Long.parseLong(data.split(",")[1].trim()),
                        Double.parseDouble(data.split(",")[2].trim())
                )
        ).returns(SensorReading.class);


        OutputTag<SensorReading> laterTag = new OutputTag<SensorReading>("laterData") {
        };
        // 间歇性生成 watermark, 设置最长容忍乱序数据时间为5
        SingleOutputStreamOperator<SensorReading> result = stream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {
                    @Override
                    public long extractTimestamp(SensorReading element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })).keyBy(SensorReading::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 允许数据的最大时间
                .allowedLateness(Time.seconds(2))
                // 采用侧输出流对迟到数据进行处理
                .sideOutputLateData(laterTag)
                .apply(
                        new WindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
                            @Override
                            public void apply(String s, TimeWindow window, Iterable<SensorReading> input, Collector<SensorReading> out) throws Exception {
                                System.out.println("window : [" + window.getStart() + ", " + window.getEnd() + "]");
                                ArrayList<SensorReading> list = new ArrayList<>((Collection<? extends SensorReading>) input);
                                list.forEach(out::collect);
                            }
                        });
        result.print();
        result.getSideOutput(laterTag).print("later data");
        env.execute();
    }
}

@Data
@AllArgsConstructor
public class SensorReading {
    private String id;
    private long timestamp;
    private double temperature;
}

6.2. Late elements considerations

当指定允许的延迟时间大于0时,在水印通过窗口末端后,窗口及其内容将被保留。在这些情况下,当一个迟到但没有被删除的元素到达时,它可能会触发对窗口的另一次触发。这些触发被称为延迟触发,因为它们是由延迟事件触发的,与主触发相反,主触发是窗口的第一次触发。
在会话窗口的情况下,延迟触发可能会进一步导致窗口的合并,因为它们可能“桥接”两个已存在的未合并窗口之间的间隙(session gap)。

7. Working with window results

窗口操作的结果仍然是一个DataStream,窗口操作的信息不会保留在结果元素中,所以如果你想保留关于窗口的元信息,你必须手动在你的ProcessWindowFunction的结果元素中编码这些信息。

7.1. Interaction of watermarks and windows

当水印到达窗口操作符时,会触发两件事:

  • 当最大时间戳(end-timestamp - 1)小于新水印时,该水印触发所有Windows的计算,
  • 水印被转发给下游操作
    a watermark “flushes” out any windows that would be considered late in downstream operations once they receive that watermark.

7.2. Consecutive windowed operations

当您想要执行两个连续的窗口操作,并且希望使用不同的键,但仍然希望来自相同上游窗口的元素最终到达相同的下游窗口时,这可能很有用

val input: DataStream[Int] = ...

val resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer())

val globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction())

在本例中,第一个操作的时间窗口[0,5)的结果也将在随后的窗口操作的时间窗口[0,5)中结束。这允许计算每个键的和,然后在第二个操作中计算同一窗口内的top-k个元素。

8. Useful state size considerations

窗口可以在很长一段时间内(如天、周或月)定义,因此会积累非常大的状态。
在估算窗口计算的存储需求时,有几个规则需要牢记:

  • Flink为它所属的每个窗口创建每个元素的一个副本。
    • 滚动窗口会保留每个元素的一个副本(一个元素只属于一个窗口,除非它被延迟删除)
    • 滑动窗口创建每个元素的几个副本
  • ReduceFunction和AggregateFunction可以显著减少存储需求,因为它们会增量聚合元素,并且每个窗口只存储一个值。相反,仅仅使用ProcessWindowFunction就需要累积所有元素。
  • 使用Evictor 可以防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须通过Evictor
上一篇:Flink代码1


下一篇:Flink 流处理 API