Event Time
- 查看本部分时请先查阅 Timely Stream Processing
- 如果想使用 Event Time ,需要先设置正确的时间特征,方法如下
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置流的时间特征,使用Event Time 必须要设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- 注意:指定时间特征后还需要使用数据源中直接根据 Event Time 生成 watermark 或者程序中在数据源后注入 Timestamp Assigner & Watermark Generator 关于watermark 请查看下面的内容
Generating Watermarks
Introduction to Watermark Strategies
- 为了使用Event Time ,每个元素要有一个timestamp,一般是通过TimestampAssigner 获取
- timestamp 与生成watermark 紧密关联, watermark 可以通过WatermarkGenerator 指定
- Flink API 提供的WatermarkStrategy 包含了
TimestampAssigner
和 WatermarkGenerator,WatermarkStrategy 提供了一部分静态方法,用户也可以进行自定义,自定义调用如下方法public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{ /** * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this * strategy. */ @Override TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context); /** * Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */ @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
- WatermarkStrategy lambda调用
WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withTimestampAssigner((event, timestamp) -> event.f0);
- 注意:所有的 timestamp 和watermark 指定的时间都是 毫秒, 基于 1970-01-01T00:00:00Z.
Using Watermark Strategies
-
WatermarkStrategy
在程序中有两处可以使用 - 直接基于source------最优 (because it allows sources to exploit knowledge about shards/partitions/splits in the watermarking logic)
-
非源操作后 --仅在第一种不能满足时使用
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<MyEvent> stream = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter(), typeInfo); DataStream<MyEvent> withTimestampsAndWatermarks = stream .filter( event -> event.severity() == WARNING ) .assignTimestampsAndWatermarks(<watermark strategy>); withTimestampsAndWatermarks .keyBy( (event) -> event.getGroup() ) .timeWindow(Time.seconds(10)) .reduce( (a, b) -> a.add(b) ) .addSink(...);
Event Time 窗口开始时间计算 公式
TimeWindow中 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
package com.bbx.flink.demo.allow_latenss; import com.bbx.flink.demo.entity.Temperature; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.time.Duration; /** * 本Demo功能点 * 1、设置Event Time 的watermark , * 2、侧输出流,等待迟到的数据 * * 注意点: * 1、进哪个窗口是由 Event Time 决定的 * 2、watermark 只是用来表示 标记时间内的元素已经到达, * 3、窗口开始时间计算公式 :timestamp - (timestamp - offset + windowSize) % windowSize * windowSize 为窗口大小 * timestamp 为 Event Time * offset : 时间偏移(主要用于时区调整) * 例如:以下面程序为例,windowSize 为:15s , offset :0,watermark 延迟时间为 2s ,输入的元素如下: * 1,1607654003161,11 0 * 1,1607654004161,22 1 * 1,1607654008161,19 5 * 1,1607654009161,16 6 * 1,1607654010161,17 7 * 1,1607654011161,17 8 * 1,1607654012161,27 9 * * 1,1607654013161,17 10 * 输入第一个元素的 event time为 1607654003161 ms ,offset 为0 * 则窗口开始时间为 :1607654003161-(1607654003161-0+15000)%15000=1607653995000 * 因此范围为 第一个窗口为 [1607653995000 1607654010000) * 第二个窗口为 [1607654010000 1607654025000) * ....... * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * 窗口内数据 watermark 时间 * 第一个窗口 1,1607654003161,11 1607654001161 * 1,1607654004161,22 1607654002161 * 1,1607654008161,19 1607654006161 * 1,1607654009161,16 1607654007161 * 第二个窗口 1,1607654010161,17 1607654008161 * 1,1607654011161,17 1607654009161 * 1,1607654012161,27 1607654010161 * 1,1607654013161,17 1607654011161 * 因为watermark延迟时间为2 s ,因此第一个窗口关闭时间为 1607654012000,当元素 1,1607654012161,27 抵达时第一个窗口开始关闭 */ @Slf4j public class SideOutDemo { public static void main (String [] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置流的时间特征,使用Event Time 必须要设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //设置watermark 的时间间隔,有默认值 // env.getConfig().setAutoWatermarkInterval(1L); OutputTag<Temperature> sideWindow = new OutputTag<>("sideWindow", PojoTypeInfo.of(Temperature.class)); SingleOutputStreamOperator<Temperature> reduce = env.socketTextStream("114.116.104.74", 10003) .map((MapFunction<String, Temperature>) elment -> { String[] varElment = elment.split(","); return new Temperature(varElment[0], Long.parseLong(varElment[2]), Long.parseLong(varElment[1])); }) .assignTimestampsAndWatermarks( WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((i, timestamp) -> i.getTime()) ) .keyBy(i -> i.getId()) .timeWindow(Time.seconds(15)) //设定等待迟到数据的时间 .allowedLateness(Time.seconds(5L)) //为迟到数据开启侧输出流 .sideOutputLateData(sideWindow) .max("tem"); reduce.print("~~~~~~~~~"); reduce.getSideOutput(sideWindow).print("######"); env.execute(); } }
Dealing With Idle Sources
- 在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。
- 所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。
WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withIdleness(Duration.ofMinutes(1));
Writing WatermarkGenerators
- 简单的处理方式就是抽取元素中的 event time,这样我们不需要关注一些细节,除此之外,我们就会在下面的章节中介绍两种写起来比较复杂的方式,
/** * The {@code WatermarkGenerator} generates watermarks either based on events or * periodically (in a fixed interval). * * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}. */ @Public public interface WatermarkGenerator<T> { /** 每个元素抵达后调用,通常用于生成 Punctuated watermark 通过事件触发 * Called for every event, allows the watermark generator to examine and remember the * event timestamps, or to emit a watermark based on the event itself. */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /**通常用于生成 periodic watermark, 通过时间触发 * Called periodically, and might emit a new watermark, or not. * * <p>The interval in which this method is called and Watermarks are generated * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. */ void onPeriodicEmit(WatermarkOutput output); }
- watermark 生成有两种不同的方式,periodic (周期性的) 和 punctuated(标点性的)
- 周期性的调用生成器的方法,如果返回的watermark 非空并且大于以前的watermark,将发出新的watermark, 周期参数可以通过 setAutoWatermarkInterval(...) 设定
env.getConfig().setAutoWatermarkInterval(1L)
- 两个简单示例如下
** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */ public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { //已经接收到元素的最大 timestamp 减去 最大乱序程度 -1 ,此处的 -1 表示不包含这个时间---个人理解 // emit the watermark as current highest timestamp minus the out-of-orderness bound output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)); } } /** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */ public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> { private final long maxTimeLag = 5000; // 5 seconds @Override public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { // don‘t need to do anything because we work on processing time } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag)); } }
Writing a Punctuated WatermarkGenerator
- 每个元素抵达后生成一个watermark,过多的watermark 会影响性能,数据流中元素较少时可以使用
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> { @Override public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { if (event.hasWatermarkMarker()) { output.emitWatermark(new Watermark(event.getWatermarkTimestamp())); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // don‘t need to do anything because we emit in reaction to events above } }
- Watermark Strategies and the Kafka Connector
- How Operators Process Watermarks
- The Deprecated AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks
- Flink 预定义了一部分 timestamp assigners ,用户也可以自定义实现,实现方法请查阅Flink 预先定义的方法
Monotonously Increasing Timestamps
- 单调递增时间戳,在数据没有乱序的情况下,周期性 watermark 最简单的方式就是使用 数据源的时间戳或者当前时间戳
WatermarkStrategy.forMonotonousTimestamps();
- Fixed Amount of Lateness
- 固定延迟量,有迟到数据时调用
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
- Debugging Windows & Event Time