一、时间语义
在 Flink 的流式操作中, 会涉及不同的时间概念
1.1 处理时间
处理时间是指的执行操作的各个设备的时间。
对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为 1 个小时的窗口将会包含设备时钟表示的 1 个小时内所有的数据. 假设应用程序在 9:15am 分启动, 第 1 个小时窗口将会包含 9:00am 到 10:00am 所有的数据, 然后下个窗口是 10:00am-11:00am, 等等。
处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序。
在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。
这篇文章之前所使用的全是处理时间
1.2 事件时间
事件时间是指的这个事件发生的时间.
在 event 进入 Flink 之前, 通常被嵌入到了 event 中, 一般作为这个 event 的时间戳存在.
在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关。事件时间程序必须制定如何产生 Event Time Watermarks。在事件时间体系中, Watermarks是表示时间进度的标志(作用就相当于现实时间的时钟)。
在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果。事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。 由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。
在 1.12 之前默认的时间语义是处理时间, 从 1.12 开始, Flink 内部已经把默认的语义改成了事件时间
二、WaterMark
考虑这样情况,结合上一篇开窗案例,假设滚动窗口大小 5min,存在一个 9:00-9:05,现在传感器在这个时间段每分钟生成了 5 条数据即 9:00、9:01、9:02、9:03、9:04,但因为网络原因数据在 9:06 才到达 flink 参与计算,但此时 9:00-9:05 窗口已经关闭这批数据就会被丢失。
2.1 原理
事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面,使用事件时间可以在几秒内处理已经缓存在 Kafka 中多周的数据, 这些数据可以照样被正确处理,就像实时发生的一样能够进入正确的窗口。因此在 Flink 中测量事件时间的进度机制就是 WaterMark,同时 WaterMark 作为内嵌在数据流中的特殊数据,并且携带一个时间戳,在开窗中使用作为窗口关闭的衡量标准。具备单调递增的特性。如 9:00-9:05 的窗口,只要 WaterMark 携带的时间没有到 9:05 窗口就不会被关闭。一个 Watermark(t)表示在这个流里面事件时间已经到了时间 t, 意味着此时, 流中不应该存在这样的数据: 他的时间戳 t2<= t (时间比较旧或者等于时间戳)
2.1.1 有序流中的 WaterMark
在下面的这个图中, 事件是有序的(按照他们自己的时间戳来看), WaterMark 是流中一个简单的周期性的标记,通常情况下, WaterMark是一种标记, 是流中的一个点, 所有在这个时间戳(WaterMark中的时间戳)前的数据应该已经全部到达. 一旦 WaterMark 到达了算子, 则这个算子会提高他内部的时钟的值为这个 WaterMark 的值。此时可以解决因延迟迟到的数据。
W(6) 表示事件时间已经到 6s,但实际上内部的最大时间戳是需要减 1ms,类比窗口的结束时间,防止存在两条事件时间相同的数据
2.1.2 无序流中的 WaterMark
若数据不仅有延迟,而且各个数据源延迟不一样,这就导致数据延迟且无序,那么上述的 WaterMark 取法就存在问题,此时需要给予 WaterMark 一个合理的延迟,这个延迟取决去系统最大的乱序程度可以再略高一点
其相当于每个窗口往后延迟一定时间再关闭,数据还是根据自身所携带的时间进入对应的窗口,也就是同一时刻会存在若干个窗口当延迟后的 WaterMark 到达窗口的关闭时间,窗口关闭输出数据。
2.1.3 设定规则
在 Flink 中,WaterMark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解,如果 WaterMark设置的延迟太久,收到结果的速度可能就会很慢,而如果WaterMark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。
2.2 生成器
如何在代码中生成 WaterMark 呢?Flink 内置两个 WaterMark 生成器,forMonotonousTimestamps 和 BoundedOutOfOrdernessWatermarks 分别对应有序和无序流,但从本质上看 forMonotonousTimestamps 是延迟为 0 的 BoundedOutOfOrdernessWatermarks
代码如下:
package day06;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Flink01_WaterMark_EventTime {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度,不设置涉及 watermark 传递
env.setParallelism(1);
// 读取数据,转换成 POJO
SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}
}).returns(Types.POJO(WaterSensor.class));
// 提取事件时间
WatermarkStrategy<WaterSensor> wm = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
waterSensorDS
// 注册事件时间
.assignTimestampsAndWatermarks(wm)
.keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("vc")
.print();
env.execute();
}
}
结果如下:
只要输入的时间戳小于 1577844005 第一个窗口就不会关闭,此时已经和处理时间没有关系了。
WaterMark 延迟 3s 解决乱序问题,代码如下:
package day06;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
public class Flink02_WaterMark_OutOfOrder {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度,不设置涉及 watermark 传递
env.setParallelism(1);
// 读取数据,转换成 POJO
SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}
}).returns(Types.POJO(WaterSensor.class));
// 提取事件时间,设置 WaterMark 延迟 3s
WatermarkStrategy<WaterSensor> wm = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
waterSensorDS
// 注册事件时间
.assignTimestampsAndWatermarks(wm)
.keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("vc")
.print();
env.execute();
}
}
结果如下:
可以看出即使 1577844001、1577844002 迟到也是可以被统计到的,若是这两条数据在 1577844008 之后到达即超过了延迟时间后才到,那此种方式这两条数据就被丢了。
2.3 多并行度下 WaterMark 传递
注意到上述的代码都是设置并行度为 1,若注释掉并行度或者并行度设置为 2,这时候就会发现即使数据的时间已经到了 1577844008 第一个窗口都不会被关闭,这是因为:多并行度下 WaterMark 向下游传递采用的是轮播,下游取最小的 WaterMark。假设并行度为 2,如图:
此时 WaterMark 到达 8,因为轮播到下游,且下游取最小的导致 keyBy 算子 WaterMark 依然是 1,除非再来一条让 WaterMark 到达 8 的数据,才会让下游的 WaterMark 发生改变。如图:
总结 WaterMark 规律如下:
那么上述问题如何解决呢,设置并行度为 1 肯定不显示,那么可以将生成 WaterMark 的时机提前,在 Source 后直接提取生成即可,代码如下:
package day06;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
public class Flink03_WaterMark_OutOfOrder2 {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 提取事件时间
WatermarkStrategy<String> wm = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(" ")[1]) * 1000;
}
});
// 读取数据,转换成 POJO
env.socketTextStream("localhost", 1111)
// 注册事件时间
.assignTimestampsAndWatermarks(wm)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}
}).returns(Types.POJO(WaterSensor.class))
.keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("vc")
.print();
env.execute();
}
}
再次测试:
2.4 自定义生成器
Flink 提供 2 种风格的 WaterMark 生产方式: periodic(周期性) and punctuated(间歇性)。都需要继承 接口: WatermarkGenerator,实际生产中都是 periodic,periodic 默认 200ms 生成一次 WaterMark,punctuated 每条数据生成一次 WaterMark;
优缺点比较:
- 当数据量大时,punctuated 会产生大量的 WaterMark 占用内存空间
- 当数据量小时,periodic 会产生大量的 WaterMark,但没有什么影响(Flink 本身就是大数据计算框架,数据量小的时候生成一点 WaterMark 怎么了)
periodic
package day06;
import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class Flink05_WaterMark_Periodic {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置定时生成时间
env.getConfig().setAutoWatermarkInterval(200);
// 设置并行度,不设置涉及 watermark 传递
env.setParallelism(1);
// 读取数据,转换成 POJO
SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}
}).returns(Types.POJO(WaterSensor.class));
// 提取事件时间
WatermarkStrategy<WaterSensor> wm = new WatermarkStrategy<WaterSensor>() {
@Override
public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new MyPeriodic(2000);
}
}.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator<WaterSensor> result = waterSensorDS
// 注册事件时间
.assignTimestampsAndWatermarks(wm)
.keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 允许数据迟到 2s
.allowedLateness(Time.seconds(2))
// 3s + 2s 后的数据放入测输出流
.sideOutputLateData(new OutputTag<WaterSensor>("Side") {
})
.sum("vc");
result.print("master");
result.getSideOutput(new OutputTag<WaterSensor>("Side") {
}).print("side");
env.execute();
}
private static class MyPeriodic implements WatermarkGenerator<WaterSensor> {
private long maxTs;
// 允许延迟的最大时间
private final long maxDelay;
MyPeriodic(long maxDelay) {
this.maxDelay = maxDelay;
// 因为周期生成,需要在赋值加 maxDelay,否则调动 onPeriodicEmit 直接变成最大值
maxTs = Long.MIN_VALUE + maxDelay - 1;
}
// 每条数据调用一次
@Override
public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
// 保证 WaterMark 递增
maxTs = Math.max(maxTs, eventTimestamp);
}
// 周期性调用(默认200ms)
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
}
}
}
不要问我为什么写的这么漂亮,因为 BoundedOutOfOrdernessWatermarks 就是这么实现的