翻译来源- DataStream API Event Time
翻译来源- DataStream API Generating Watermarks
翻译来源- DataStream API Builtin Watermark Generators
概览
事件时间
原文Event Time。
在本节中,您将学习有关编写可感知时间的Flink程序的信息。请看一下及时流处理以了解及时流处理背后的概念。
有关如何在Flink程序中使用时间的信息,请参阅 windowing和 ProcessFunction。
请注意,为了使用事件时间感知操作,程序需要使用直接为数据定义事件时间并自己发出水印的源,或者程序必须在源之后注入时间戳记分配器和水印生成器。这些功能描述了如何访问事件时间戳,以及事件流表现出的乱序程度。
接下来去哪儿
生成水印:显示如何编写时间戳分配器和水印生成器,这对于事件时间感知Flink应用程序是必需的。
内置水印生成器:概述内置水印生成器。
调试Windows和事件时间:展示如何在Flink事件时间应用程序中调试水印和时间戳的周边问题。
水印生成
在本节中,您将了解Flink提供的用于处理事件时间时间戳和水印的API 。有关事件时间,处理时间和摄取时间的介绍,请参阅事件时间的介绍。
水印策略简介
为了使用事件时间,Flink需要知道事件时间戳,这意味着流中的每个元素都需要分配一个事件时间戳。这通常是通过使用TimestampAssigner来访问/提取来自元素中某个字段的时间戳来完成的。
时间戳分配与生成水印一起进行,水印告诉系统事件时间的进度。您可以通过指定WatermarkGenerator配置水印生成。
Flink API期望一个同时包含 TimestampAssigner和WatermarkGeneratord的WatermarkStrategy。许多常用策略以WatermarkStrategy上静态方法的形式开箱即用,用户也可在需要时构建自己的策略。
出于阐述的完整性考虑,给出这个接口:
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
* strategy.
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
如前所述,您通常不会自己实现此接口,而是将WatermarkStrategy上静态的helper方法用于常见的水印策略或将一个自定义的TimestampAssigner与WatermarkGenerator捆绑在一起。例如,要同时使用有限无序水印和一个作为时间戳分配器的lambda函数,请使用以下命令:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
指定TimestampAssigner 是可选的,实际上在大部分情况你不想要指定一个。例如,当使用Kafka或Kinesis时,你会直接用Kafka/Kinesis记录直接获取时间戳。
在之后写WatermarkGenerators的文章,你将看到WatermarkGenerator 接口。
注意:时间戳和水印都从以从1970-01-01T00:00:00ZJava开始的毫秒数的形式指定。
使用水印策略
在Flink应用程序中,有两个地方可以用WatermarkStrategy:1)直接用在源上 2)在不是源的操作上
第一个选项更优,因为它允许在水印逻辑上利用有关shards/partitions/splits的已知知识。源通常可以在一个精细的水平上跟踪水印,并且源生成的完整水印会更精确。直接在源上指定一个水印意味着你要用到Watermark Strategies and the Kafka Connector的源说明接口/引用,可以了解WatermarkStrategy在Kafka Connector上运行方式并且可以收获更多关于每个分区水印运行方式的细节。
第二个选项(在任意操作上设置水印策略)应该仅被用在当你不能在源上直接设置策略的情况:
Java代码
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
这种使用WatermarkStrategy 的方式拿了一个流并且产生一个带有时间戳元素和水印的新流。如果原始流已经有时间戳和/或水印,时间戳分配器将重写它们。
处理空闲源
如果一段时间输入的splits/partitions/shards的其中一个没有任何事件,意味着,WatermarkGenerator也不会获取任何以水印为基础的新信息。我们称这个输入为空闲输入或者空闲源。空闲源是一个问题,因为其它分区仍旧有事件。在这种情况下,这个水印将拖后腿,因为它在所有不同的并行水印上被当作最小值来计算。
为了处理这种情况,你可以使用一个探测空闲和将输入标记空闲的WatermarkStrategy 。WatermarkStrategy 为这种情况提供了一个方便的助手方法。
Java代码:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
写水印生成代码
一个TimestampAssigner 是一个从事件中提取属性的简单函数,因为不需要详细介绍。另一方面,一个WatermarkGenerator重写是有点复杂的,我们在接下来两部分看一看如何做。这是WatermarkGenerator的接口:
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or
* periodically (in a fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
有两种不同的水印生成风格:周期性的和符号形式的。
通常一个周期性的生成器,通过onEvent()观察输入事件,之后调用onPeriodicEmit()发射一个水印。
一个符号形式的生成器会在onEvent()中观察事件,并且等待流中携带水印信息的特殊标记的事件或者符号。当看到这种事件的一个时,就会马上发出一个水印。通常,符号生成不会从onPeriodicEmit()发射水印。
我们接下来会看如何实现不同风格的生成器。
写周期WatermarkGenerator代码
一个周期生成器观察流事件并周期性的生成水印(可能依赖于流元素,或依完全基于处理事件)。
水印生成时间间隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定义。生成器的onPeriodicEmit() 每个时刻调用一次,并且,如果返回的水印不空并且比原来的水印更大,这个新的水印就会被发射出去。
这里展示两个用周期水印生成水印生成器的两个简单例子。注意Flink推出了BoundedOutOfOrdernessWatermarks,它是一个跟BoundedOutOfOrdernessGenerator 相似的WatermarkGenerator 。在这里你可以了解使用。
Note that Flink ships with BoundedOutOfOrdernessWatermarks, which is a WatermarkGenerator that works similarly to the BoundedOutOfOrdernessGenerator shown below. You can read about using that here.
Java代码
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
写符号形式的WatermarkGenerator代码
一个符号水印生成器将观察事件流,并且当看到一个携带水印信息的特殊元素时发射一个水印。
这是实现方式,无论何时一个携带确定标记的事件出现,符号形式生成器会发射一个事件。
Java代码
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// don't need to do anything because we emit in reaction to events above
}
}
注意:在每个事件上生成一个水印这种情况是可能发生的。然而,因为每个水印会引起下游的一些计算,过量的水印会降低性能。
水印策略和Kafka连接器
当使用Kafka作为一个数据源时,每一个Kafka分区可能会有一个简单的事件时间模式(升序时间戳或者有界的无序)。然而,当从Kafka消费流时,多个分区经常被并行消费,来自分区们的事件混杂一起,并且破坏了单个分区的模式(这是Kafka的消费客户端工作方式决定的)。
这种情况下,你可以用Flink的Kafka分区感知水印生成。用这个功能,水印在Kafka消费者内部生成,每一个Kafka分区,并且每一个分的区水印们跟水印在流混洗合并的方式一样被合并。
例如,如果每个Kafk分区的事件时间戳是严格正序的,以正序的时间戳水印生成每个分区水印会得到完美的完整水印。注意,在上面的例子中我们不提供一个TimestampAssigner ,Kafka记录的时间戳本身会被代替使用。
下面的插图显示如何用每个Kafka分区水印生成,并且这种情况展示了水印通过流数据流的传播方式。
Java代码
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
DataStream<MyType> stream = env.addSource(kafkaSource);
操作符如何处理水印
作为一个通用的习惯做法,操作符需要在转发水印到下游之前完全处理它。例如,WindowOperator 将首先评估所有该被触发的窗口,在产生被水印触发的所有输出以后,水印本身将被发送到下游。换句话说,由于水印的出现所有被产生的元素将在水印之前被发出。
相同的规则应用到TwoInputStreamOperator。然而,在这种情况操作符的当前水印被定义为它的输入中两者的最小值。
这个行为的详情被OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 and TwoInputStreamOperator#processWatermark2方法实现定义。
废弃的AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks
在介绍当前WatermarkStrategy, TimestampAssigner, and WatermarkGenerator的抽象之前,Flink使用AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks。你仍然会在API中看到它们,但是推荐使用新接口,因为它们提供一个清晰的概念分割并且也统一了周期和符号形式风格水印生成的风格。
内置的水印生成器
像在生成水印中描述的一样,Flink提供了一个抽象定义,可以让开发者分配时间戳和发出水印。特殊情况下,可以通过实现WatermarkGenerator接口来生成水印。
为了更进一步简便这种任务的开发,Flink自带了一些预先实现的时间戳分配器。这个部分提供了它们的清单。除了它们的开箱即用功能外,它们的实现可以看做自定义实现的一个例子。
单调递增时间戳(元素)
用于周期水印生成最简单的特殊场景是,当源任务发现以升序出现的时间戳。这种情况,当前时间戳可以一直当做一个水印,因为没有比当前时间更早的时间戳会到达。
注意,对于每个并行数据源任务,时间戳升序是必要的。例如,在一个特殊设置中,一个Kafka分区被一个并行数据源实例读取,因此在一个Kafka分区内内的时间戳是正序的。无论什么时候,并行流被shuffled, unioned, connected, 或者merged时,Flink的水印合并机制将生成正确的水印。
WatermarkStrategy.forMonotonousTimestamps();
固定大小的延迟
另一个周期性水印生成的场景是,水印落后于流中最大事件时间戳一个固定时间。这种情况涵盖了预先知道流中最大可能延迟的场景,例如,当创建一个供测试使用的自定义源时,源中包含遍布于固定时间周期内带有时间戳的元素。对于这些情况,Flink提供了BoundedOutOfOrdernessWatermarks生成器,它将maxOutOfOrderness作为参数,即在计算给定窗口的最终结果时,允许元素延迟的最长时间。lateness = t-t_w ,其中t是元素的(事件时间)时间戳,t_w 是上一个水印的时间戳。如果lateness>0,则该元素被认为是延迟的,并且在为其相应窗口计算作业结果时,默认忽略该元素。有关使用延迟元素的更多信息,请参阅关于允许延迟的文档。
Java代码
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));