窗口概述
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理*流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算
Keyed vs Non-Keyed Windows
其实, 在用window前首先需要确认应该是在keyBy后的流上用, 还是在没有keyBy的流上使用。在keyed streams上使用窗口, 窗口计算被并行的运用在多个task上, 可以认为每个task都有自己单独窗口.
在非non-keyed stream上使用窗口, 流的并行度只能是1, 所有的窗口逻辑只能在一个单独的task上执行.代码如下:
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
需要注意的是: 非key分区的流上使用window, 如果把并行度强行设置为>1, 则会抛出异常
窗口的生命周期
1.当一个属于window的元素到达之后这个window就创建了
2.当当前时间(事件或者处理时间)为window的创建时间+窗口大小+用户指定的延迟时间时,窗口将被彻底清除
窗口的分类
窗口分为2类:
1.基于时间的窗口(时间驱动)
2.基于元素个数的(数据驱动)
基于时间窗口
1.时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸
2.在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())
说明:
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定.
2.我们传递给window函数的对象叫窗口分配器.
滚动窗口(Tumbling Windows)
1.滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
2.滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
示例代码:
env
.socketTextStream("linux01", 9999)
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
Arrays.stream(value.split("\\W+")).forEach(word -> out.collect(Tuple2.of(word, 1L)));
}
})
.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(8))) // 添加滚动窗口
.sum(1)
.print();
滑动窗口(Sliding Windows)
1.与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
2.所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
3.例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据
示例代码:
env
.socketTextStream("hadoop102", 9999)
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
Arrays.stream(value.split("\\W+")).forEach(word -> out.collect(Tuple2.of(word, 1L)));
}
})
.keyBy(t -> t.f0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 添加滚动窗口
.sum(1)
.print();
env.execute();
会话窗口(Session Windows)
1.会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
2.如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
3.我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口
创建原理:
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction
示例代码:
1.静态gap
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
2.动态gap
.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
@Override
public long extract(Tuple2<String, Long> element) { // 返回 gap值, 单位毫秒
return element.f0.length() * 1000;
}
}))
全局窗口(Global Windows)
全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任何计算, 因为这种窗口没有能够处理聚集在一起元素的结束点.
示例代码
.window(GlobalWindows.create());
基于元素个数的窗口
按照指定的数据条数生成一个Window,与时间无关
滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行
示例代码
.countWindow(3)
滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
示例代码
.countWindow(3, 2)
窗口函数
1.前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
2.window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
3.ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 .
4.ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息5.ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素
ReduceFunction(增量聚合函数)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
System.out.println(value1 + " ----- " + value2);
// value1是上次聚合的结果. 所以遇到每个窗口的第一个元素时, 这个函数不会进来
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
AggregateFunction(增量聚合函数)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Tuple2<String, Long>, Long, Long>() {
// 创建累加器: 初始化中间值
@Override
public Long createAccumulator() {
System.out.println("createAccumulator");
return 0L;
}
// 累加器操作
@Override
public Long add(Tuple2<String, Long> value, Long accumulator) {
System.out.println("add");
return accumulator + value.f1;
}
// 获取结果
@Override
public Long getResult(Long accumulator) {
System.out.println("getResult");
return accumulator;
}
// 累加器的合并: 只有会话窗口才会调用
@Override
public Long merge(Long a, Long b) {
System.out.println("merge");
return a + b;
}
})
ProcessWindowFunction(全量聚合函数)
.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
// 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
@Override
public void process(String key,
Context context,
Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println(context.window().getStart());
long sum = 0L;
for (Tuple2<String, Long> t : elements) {
sum += t.f1;
}
out.collect(Tuple2.of(key, sum));
}
})