Flink 高阶编程:时间语义

一、时间语义

在 Flink 的流式操作中, 会涉及不同的时间概念

Flink 高阶编程:时间语义

1.1 处理时间

处理时间是指的执行操作的各个设备的时间。

对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为 1 个小时的窗口将会包含设备时钟表示的 1 个小时内所有的数据. 假设应用程序在 9:15am 分启动, 第 1 个小时窗口将会包含 9:00am 到 10:00am 所有的数据, 然后下个窗口是 10:00am-11:00am, 等等。

处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序。

在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。

这篇文章之前所使用的全是处理时间

1.2 事件时间

事件时间是指的这个事件发生的时间.

在 event 进入 Flink 之前, 通常被嵌入到了 event 中, 一般作为这个 event 的时间戳存在.

在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关。事件时间程序必须制定如何产生 Event Time Watermarks。在事件时间体系中, Watermarks是表示时间进度的标志(作用就相当于现实时间的时钟)。

在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果。事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。 由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。

假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。

在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。

在 1.12 之前默认的时间语义是处理时间, 从 1.12 开始, Flink 内部已经把默认的语义改成了事件时间

二、WaterMark

考虑这样情况,结合上一篇开窗案例,假设滚动窗口大小 5min,存在一个 9:00-9:05,现在传感器在这个时间段每分钟生成了 5 条数据即 9:00、9:01、9:02、9:03、9:04,但因为网络原因数据在 9:06 才到达 flink 参与计算,但此时 9:00-9:05 窗口已经关闭这批数据就会被丢失。

2.1 原理

事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面,使用事件时间可以在几秒内处理已经缓存在 Kafka 中多周的数据, 这些数据可以照样被正确处理,就像实时发生的一样能够进入正确的窗口。因此在 Flink 中测量事件时间的进度机制就是 WaterMark,同时 WaterMark 作为内嵌在数据流中的特殊数据,并且携带一个时间戳,在开窗中使用作为窗口关闭的衡量标准。具备单调递增的特性。如 9:00-9:05 的窗口,只要 WaterMark 携带的时间没有到 9:05 窗口就不会被关闭。一个 Watermark(t)表示在这个流里面事件时间已经到了时间 t, 意味着此时, 流中不应该存在这样的数据: 他的时间戳 t2<= t (时间比较旧或者等于时间戳)

2.1.1 有序流中的 WaterMark

在下面的这个图中, 事件是有序的(按照他们自己的时间戳来看), WaterMark 是流中一个简单的周期性的标记,通常情况下, WaterMark是一种标记, 是流中的一个点, 所有在这个时间戳(WaterMark中的时间戳)前的数据应该已经全部到达. 一旦 WaterMark 到达了算子, 则这个算子会提高他内部的时钟的值为这个 WaterMark 的值。此时可以解决因延迟迟到的数据。

Flink 高阶编程:时间语义

W(6) 表示事件时间已经到 6s,但实际上内部的最大时间戳是需要减 1ms,类比窗口的结束时间,防止存在两条事件时间相同的数据

2.1.2 无序流中的 WaterMark

若数据不仅有延迟,而且各个数据源延迟不一样,这就导致数据延迟且无序,那么上述的 WaterMark 取法就存在问题,此时需要给予 WaterMark 一个合理的延迟,这个延迟取决去系统最大的乱序程度可以再略高一点

Flink 高阶编程:时间语义

其相当于每个窗口往后延迟一定时间再关闭,数据还是根据自身所携带的时间进入对应的窗口,也就是同一时刻会存在若干个窗口当延迟后的 WaterMark 到达窗口的关闭时间,窗口关闭输出数据。

2.1.3 设定规则

在 Flink 中,WaterMark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解,如果 WaterMark设置的延迟太久,收到结果的速度可能就会很慢,而如果WaterMark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。

2.2 生成器

如何在代码中生成 WaterMark 呢?Flink 内置两个 WaterMark 生成器,forMonotonousTimestamps 和 BoundedOutOfOrdernessWatermarks 分别对应有序和无序流,但从本质上看 forMonotonousTimestamps 是延迟为 0 的 BoundedOutOfOrdernessWatermarks

代码如下:

package day06;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;


public class Flink01_WaterMark_EventTime {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度,不设置涉及 watermark 传递
        env.setParallelism(1);

        // 读取数据,转换成 POJO
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] els = value.split(" ");
                        return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
                    }
                }).returns(Types.POJO(WaterSensor.class));

        // 提取事件时间
        WatermarkStrategy<WaterSensor> wm = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000;
            }
        });

        waterSensorDS
                // 注册事件时间
                .assignTimestampsAndWatermarks(wm)
                .keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("vc")
                .print();

        env.execute();
    }
}

结果如下:

Flink 高阶编程:时间语义

只要输入的时间戳小于 1577844005 第一个窗口就不会关闭,此时已经和处理时间没有关系了。

WaterMark 延迟 3s 解决乱序问题,代码如下:

package day06;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;


public class Flink02_WaterMark_OutOfOrder {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度,不设置涉及 watermark 传递
        env.setParallelism(1);

        // 读取数据,转换成 POJO
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] els = value.split(" ");
                        return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
                    }
                }).returns(Types.POJO(WaterSensor.class));

        // 提取事件时间,设置 WaterMark 延迟 3s
        WatermarkStrategy<WaterSensor> wm = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000;
                    }
                });

        waterSensorDS
                // 注册事件时间
                .assignTimestampsAndWatermarks(wm)
                .keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("vc")
                .print();

        env.execute();
    }
}

结果如下:

Flink 高阶编程:时间语义

可以看出即使 1577844001、1577844002 迟到也是可以被统计到的,若是这两条数据在 1577844008 之后到达即超过了延迟时间后才到,那此种方式这两条数据就被丢了。

2.3 多并行度下 WaterMark 传递

注意到上述的代码都是设置并行度为 1,若注释掉并行度或者并行度设置为 2,这时候就会发现即使数据的时间已经到了 1577844008 第一个窗口都不会被关闭,这是因为:多并行度下 WaterMark 向下游传递采用的是轮播,下游取最小的 WaterMark。假设并行度为 2,如图:

Flink 高阶编程:时间语义

此时 WaterMark 到达 8,因为轮播到下游,且下游取最小的导致 keyBy 算子 WaterMark 依然是 1,除非再来一条让 WaterMark 到达 8 的数据,才会让下游的 WaterMark 发生改变。如图:

Flink 高阶编程:时间语义

总结 WaterMark 规律如下:

Flink 高阶编程:时间语义

那么上述问题如何解决呢,设置并行度为 1 肯定不显示,那么可以将生成 WaterMark 的时机提前,在 Source 后直接提取生成即可,代码如下:

package day06;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;


public class Flink03_WaterMark_OutOfOrder2 {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 提取事件时间
        WatermarkStrategy<String> wm = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        return Long.parseLong(element.split(" ")[1]) * 1000;
                    }
                });

        // 读取数据,转换成 POJO
        env.socketTextStream("localhost", 1111)
                // 注册事件时间
                .assignTimestampsAndWatermarks(wm)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] els = value.split(" ");
                        return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
                    }
                }).returns(Types.POJO(WaterSensor.class))
                .keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("vc")
                .print();

        env.execute();
    }
}

再次测试:

Flink 高阶编程:时间语义

2.4 自定义生成器

Flink 提供 2 种风格的 WaterMark 生产方式: periodic(周期性) and punctuated(间歇性)。都需要继承 接口: WatermarkGenerator,实际生产中都是 periodic,periodic 默认 200ms 生成一次 WaterMark,punctuated 每条数据生成一次 WaterMark;

优缺点比较:

  1. 当数据量大时,punctuated 会产生大量的 WaterMark 占用内存空间
  2. 当数据量小时,periodic 会产生大量的 WaterMark,但没有什么影响(Flink 本身就是大数据计算框架,数据量小的时候生成一点 WaterMark 怎么了)

periodic

package day06;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;


public class Flink05_WaterMark_Periodic {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置定时生成时间
        env.getConfig().setAutoWatermarkInterval(200);

        // 设置并行度,不设置涉及 watermark 传递
        env.setParallelism(1);

        // 读取数据,转换成 POJO
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] els = value.split(" ");
                        return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
                    }
                }).returns(Types.POJO(WaterSensor.class));

        // 提取事件时间
        WatermarkStrategy<WaterSensor> wm = new WatermarkStrategy<WaterSensor>() {

            @Override
            public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new MyPeriodic(2000);
            }
        }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000;
            }
        });

        SingleOutputStreamOperator<WaterSensor> result = waterSensorDS
                // 注册事件时间
                .assignTimestampsAndWatermarks(wm)
                .keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 允许数据迟到 2s
                .allowedLateness(Time.seconds(2))
                // 3s + 2s 后的数据放入测输出流
                .sideOutputLateData(new OutputTag<WaterSensor>("Side") {
                })
                .sum("vc");

        result.print("master");
        result.getSideOutput(new OutputTag<WaterSensor>("Side") {
        }).print("side");

        env.execute();
    }

    private static class MyPeriodic implements WatermarkGenerator<WaterSensor> {
        private long maxTs;
        // 允许延迟的最大时间
        private final long maxDelay;

        MyPeriodic(long maxDelay) {
            this.maxDelay = maxDelay;
            // 因为周期生成,需要在赋值加 maxDelay,否则调动 onPeriodicEmit 直接变成最大值
            maxTs = Long.MIN_VALUE + maxDelay - 1;
        }

        // 每条数据调用一次
        @Override
        public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
            // 保证 WaterMark 递增
            maxTs = Math.max(maxTs, eventTimestamp);
        }

        // 周期性调用(默认200ms)
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
        }
    }
}

不要问我为什么写的这么漂亮,因为 BoundedOutOfOrdernessWatermarks 就是这么实现的

上一篇:keyBy多字段分组


下一篇:[白话解析] Flink的Watermark机制