Flink-DateStreamApi-eventTime

  • Event Time

    • 查看本部分时请先查阅 Timely Stream Processing 
    • 如果想使用 Event Time ,需要先设置正确的时间特征,方法如下
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      //设置流的时间特征,使用Event Time 必须要设置
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    • 注意:指定时间特征后还需要使用数据源中直接根据 Event Time  生成  watermark  或者程序中在数据源后注入 Timestamp Assigner & Watermark Generator   关于watermark 请查看下面的内容 
  • Generating Watermarks

    • Introduction to Watermark Strategies

      • 为了使用Event Time ,每个元素要有一个timestamp,一般是通过TimestampAssigner 获取
      • timestamp 与生成watermark 紧密关联, watermark 可以通过WatermarkGenerator 指定
      • Flink API 提供的WatermarkStrategy 包含了TimestampAssigner  和 WatermarkGenerator,WatermarkStrategy  提供了一部分静态方法,用户也可以进行自定义,自定义调用如下方法
        public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
        
            /**
             * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
             * strategy.
             */
            @Override
            TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
        
            /**
             * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
             */
            @Override
            WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
        }
      • WatermarkStrategy lambda调用
        WatermarkStrategy
                .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> event.f0);
      • 注意:所有的 timestamp 和watermark 指定的时间都是 毫秒, 基于 1970-01-01T00:00:00Z.
    • Using Watermark Strategies

      • WatermarkStrategy  在程序中有两处可以使用
        • 直接基于source------最优 (because it allows sources to exploit knowledge about shards/partitions/splits in the watermarking logic
        • 非源操作后 --仅在第一种不能满足时使用
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
          
          DataStream<MyEvent> stream = env.readFile(
                  myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
                  FilePathFilter.createDefaultFilter(), typeInfo);
          
          DataStream<MyEvent> withTimestampsAndWatermarks = stream
                  .filter( event -> event.severity() == WARNING )
                  .assignTimestampsAndWatermarks(<watermark strategy>);
          
          withTimestampsAndWatermarks
                  .keyBy( (event) -> event.getGroup() )
                  .timeWindow(Time.seconds(10))
                  .reduce( (a, b) -> a.add(b) )
                  .addSink(...);
        • Event Time 窗口开始时间计算 公式
          TimeWindow中
             public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
                  return timestamp - (timestamp - offset + windowSize) % windowSize;
              }

          package com.bbx.flink.demo.allow_latenss;
          
          import com.bbx.flink.demo.entity.Temperature;
          import lombok.extern.slf4j.Slf4j;
          import org.apache.flink.api.common.eventtime.WatermarkStrategy;
          import org.apache.flink.api.common.functions.MapFunction;
          import org.apache.flink.api.java.typeutils.PojoTypeInfo;
          import org.apache.flink.streaming.api.TimeCharacteristic;
          import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.windowing.time.Time;
          import org.apache.flink.util.OutputTag;
          
          import java.time.Duration;
          
          /**
           * 本Demo功能点
           * 1、设置Event Time 的watermark ,
           * 2、侧输出流,等待迟到的数据
           *
           * 注意点:
           * 1、进哪个窗口是由 Event Time 决定的
           * 2、watermark 只是用来表示 标记时间内的元素已经到达,
           * 3、窗口开始时间计算公式 :timestamp - (timestamp - offset + windowSize) % windowSize
           *      windowSize 为窗口大小
           *      timestamp 为 Event Time
           *      offset : 时间偏移(主要用于时区调整)
           *      例如:以下面程序为例,windowSize 为:15s , offset :0,watermark 延迟时间为 2s ,输入的元素如下:
           *          1,1607654003161,11    0
           *          1,1607654004161,22    1
           *          1,1607654008161,19    5
           *          1,1607654009161,16    6
           *          1,1607654010161,17    7
           *          1,1607654011161,17    8
           *          1,1607654012161,27    9
           *
           *          1,1607654013161,17    10
           *      输入第一个元素的 event time为 1607654003161 ms ,offset 为0
           *      则窗口开始时间为 :1607654003161-(1607654003161-0+15000)%15000=1607653995000
           *      因此范围为 第一个窗口为 [1607653995000   1607654010000)
           *               第二个窗口为 [1607654010000   1607654025000)
           *                   .......
           *       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
           *                    窗口内数据        watermark 时间
          *       第一个窗口 1,1607654003161,11   1607654001161
           *               1,1607654004161,22   1607654002161
           *               1,1607654008161,19   1607654006161
           *               1,1607654009161,16   1607654007161
          *       第二个窗口 1,1607654010161,17   1607654008161
           *               1,1607654011161,17   1607654009161
           *               1,1607654012161,27   1607654010161
           *               1,1607654013161,17   1607654011161
          *      因为watermark延迟时间为2 s ,因此第一个窗口关闭时间为 1607654012000,当元素 1,1607654012161,27  抵达时第一个窗口开始关闭
           */
          @Slf4j
          public class SideOutDemo {
          
              public static void  main (String [] args) throws Exception {
          
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  //设置流的时间特征,使用Event Time 必须要设置
                  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                  env.setParallelism(1);
                  //设置watermark 的时间间隔,有默认值
          //        env.getConfig().setAutoWatermarkInterval(1L);
          
                  OutputTag<Temperature> sideWindow = new OutputTag<>("sideWindow", PojoTypeInfo.of(Temperature.class));
          
                  SingleOutputStreamOperator<Temperature> reduce = env.socketTextStream("114.116.104.74", 10003)
                          .map((MapFunction<String, Temperature>) elment -> {
                              String[] varElment = elment.split(",");
                              return new Temperature(varElment[0], Long.parseLong(varElment[2]), Long.parseLong(varElment[1]));
                          })
                          .assignTimestampsAndWatermarks(
                                  WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                          .withTimestampAssigner((i, timestamp) -> i.getTime())
          
                          )
                          .keyBy(i -> i.getId())
                          .timeWindow(Time.seconds(15))
                          //设定等待迟到数据的时间
                          .allowedLateness(Time.seconds(5L))
                          //为迟到数据开启侧输出流
                          .sideOutputLateData(sideWindow)
                          .max("tem");
          
                  reduce.print("~~~~~~~~~");
                  reduce.getSideOutput(sideWindow).print("######");
                  env.execute();
              }
          }
          

    • Dealing With Idle Sources

      • 在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。
      • 所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。
        WatermarkStrategy
                .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withIdleness(Duration.ofMinutes(1));
    • Writing WatermarkGenerators

      • 简单的处理方式就是抽取元素中的 event time,这样我们不需要关注一些细节,除此之外,我们就会在下面的章节中介绍两种写起来比较复杂的方式,
        /**
         * The {@code WatermarkGenerator} generates watermarks either based on events or
         * periodically (in a fixed interval).
         *
         * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
         * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
         */
        @Public
        public interface WatermarkGenerator<T> {
        
        	/**  每个元素抵达后调用,通常用于生成 Punctuated watermark   通过事件触发
        	 * Called for every event, allows the watermark generator to examine and remember the
        	 * event timestamps, or to emit a watermark based on the event itself.
        	 */
        	void onEvent(T event, long eventTimestamp, WatermarkOutput output);
        
        	/**通常用于生成 periodic watermark,   通过时间触发
        	 * Called periodically, and might emit a new watermark, or not.
        	 *
        	 * <p>The interval in which this method is called and Watermarks are generated
        	 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
        	 */
        	void onPeriodicEmit(WatermarkOutput output);
        }
      • watermark 生成有两种不同的方式,periodic   (周期性的)  和 punctuated(标点性的)
        • 周期性的调用生成器的方法,如果返回的watermark  非空并且大于以前的watermark,将发出新的watermark, 周期参数可以通过 setAutoWatermarkInterval(...) 设定
          env.getConfig().setAutoWatermarkInterval(1L)
        • 两个简单示例如下
          **
           * This generator generates watermarks assuming that elements arrive out of order,
           * but only to a certain degree. The latest elements for a certain timestamp t will arrive
           * at most n milliseconds after the earliest elements for timestamp t.
           */
          public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
          
              private final long maxOutOfOrderness = 3500; // 3.5 seconds
          
              private long currentMaxTimestamp;
          
              @Override
              public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
                  currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
              }
          
              @Override
              public void onPeriodicEmit(WatermarkOutput output) {
                  //已经接收到元素的最大 timestamp  减去  最大乱序程度  -1  ,此处的 -1 表示不包含这个时间---个人理解 
                  // emit the watermark as current highest timestamp minus the out-of-orderness bound
                  output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
              }
          
          }
          
          /**
           * This generator generates watermarks that are lagging behind processing time by a fixed amount.
           * It assumes that elements arrive in Flink after a bounded delay.
           */
          public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
          
              private final long maxTimeLag = 5000; // 5 seconds
          
              @Override
              public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
                  // don‘t need to do anything because we work on processing time
              }
          
              @Override
              public void onPeriodicEmit(WatermarkOutput output) {
                  output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
              }
          }
      • Writing a Punctuated WatermarkGenerator

        • 每个元素抵达后生成一个watermark,过多的watermark 会影响性能,数据流中元素较少时可以使用
          public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
          
              @Override
              public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
                  if (event.hasWatermarkMarker()) {
                      output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
                  }
              }
          
              @Override
              public void onPeriodicEmit(WatermarkOutput output) {
                  // don‘t need to do anything because we emit in reaction to events above
              }
          }

    • Watermark Strategies and the Kafka Connector
    • How Operators Process Watermarks
    • The Deprecated AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks
    • Flink 预定义了一部分 timestamp assigners ,用户也可以自定义实现,实现方法请查阅Flink 预先定义的方法
    • Monotonously Increasing Timestamps

      • 单调递增时间戳,在数据没有乱序的情况下,周期性 watermark 最简单的方式就是使用 数据源的时间戳或者当前时间戳
        WatermarkStrategy.forMonotonousTimestamps();
    • Fixed Amount of Lateness
      • 固定延迟量,有迟到数据时调用
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
  • Debugging Windows & Event Time

Flink-DateStreamApi-eventTime

上一篇:vue3.0组合API


下一篇:Delphi Sytem单元 Set8087CW、Get8087CW、Default8087CW - FPU浮点值运算器