Flink的window机制

窗口概述

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。

在Flink中, 窗口(window)是处理*流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算

Keyed vs Non-Keyed Windows

其实, 在用window前首先需要确认应该是在keyBy后的流上用, 还是在没有keyBy的流上使用。在keyed streams上使用窗口, 窗口计算被并行的运用在多个task上, 可以认为每个task都有自己单独窗口.

在非non-keyed stream上使用窗口, 流的并行度只能是1, 所有的窗口逻辑只能在一个单独的task上执行.代码如下:

.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))

需要注意的是: 非key分区的流上使用window, 如果把并行度强行设置为>1, 则会抛出异常

窗口的生命周期

1.当一个属于window的元素到达之后这个window就创建了
2.当当前时间(事件或者处理时间)为window的创建时间+窗口大小+用户指定的延迟时间时,窗口将被彻底清除

窗口的分类

窗口分为2类:
1.基于时间的窗口(时间驱动)
2.基于元素个数的(数据驱动)

基于时间窗口
1.时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸
2.在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())

说明:
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定.
2.我们传递给window函数的对象叫窗口分配器.

滚动窗口(Tumbling Windows)

1.滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
2.滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
Flink的window机制
示例代码:

env
  .socketTextStream("linux01", 9999)
  .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
      @Override
      public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
          Arrays.stream(value.split("\\W+")).forEach(word -> out.collect(Tuple2.of(word, 1L)));
      }
  })
  .keyBy(t -> t.f0)
  .window(TumblingProcessingTimeWindows.of(Time.seconds(8))) // 添加滚动窗口
  .sum(1)
  .print();

滑动窗口(Sliding Windows)

1.与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
2.所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
3.例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据
Flink的window机制
示例代码:

env
  .socketTextStream("hadoop102", 9999)
  .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
      @Override
      public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
          Arrays.stream(value.split("\\W+")).forEach(word -> out.collect(Tuple2.of(word, 1L)));
      }
  })
  .keyBy(t -> t.f0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 添加滚动窗口
  .sum(1)
  .print();

env.execute();

会话窗口(Session Windows)

1.会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
2.如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
3.我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口

创建原理:

因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction

Flink的window机制
示例代码:
1.静态gap

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

2.动态gap

.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
    @Override
    public long extract(Tuple2<String, Long> element) { // 返回 gap值, 单位毫秒
        return element.f0.length() * 1000;
    }
}))

全局窗口(Global Windows)

全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任何计算, 因为这种窗口没有能够处理聚集在一起元素的结束点.
Flink的window机制
示例代码

.window(GlobalWindows.create());

基于元素个数的窗口

按照指定的数据条数生成一个Window,与时间无关

滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行

示例代码

.countWindow(3)

滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。

示例代码

.countWindow(3, 2)

窗口函数

1.前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
2.window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
3.ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 .
4.ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息5.ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素

ReduceFunction(增量聚合函数)

.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
    @Override
    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
        System.out.println(value1 + " ----- " + value2);
	  // value1是上次聚合的结果. 所以遇到每个窗口的第一个元素时, 这个函数不会进来
        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
    }
})

AggregateFunction(增量聚合函数)

.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Tuple2<String, Long>, Long, Long>() {

    // 创建累加器: 初始化中间值
    @Override
    public Long createAccumulator() {
        System.out.println("createAccumulator");
        return 0L;
    }

    // 累加器操作
    @Override
    public Long add(Tuple2<String, Long> value, Long accumulator) {
        System.out.println("add");
        return accumulator + value.f1;
    }

    // 获取结果
    @Override
    public Long getResult(Long accumulator) {
        System.out.println("getResult");
        return accumulator;
    }

    // 累加器的合并: 只有会话窗口才会调用
    @Override
    public Long merge(Long a, Long b) {
        System.out.println("merge");
        return a + b;
    }
}) 

ProcessWindowFunction(全量聚合函数)

.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
    // 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
    @Override
    public void process(String key,
                        Context context,
                        Iterable<Tuple2<String, Long>> elements,
                        Collector<Tuple2<String, Long>> out) throws Exception {
        System.out.println(context.window().getStart());
        long sum = 0L;
        for (Tuple2<String, Long> t : elements) {
            sum += t.f1;
        }
        out.collect(Tuple2.of(key, sum));
    }
})
上一篇:FLINK基础(117): DS数据类型


下一篇:Flink键控状态AggregatingState开发实例