Flink触发器Trigger-内置的Trigger

Flink 内置了许多常用的 Trigger,大多数情况下它们足以支撑我们的业务场景,只有当内置的Trigger不符合要求时,才需要开发自定义的 Trigger。

1、EventTimeTrigger

和事件时间窗口搭配使用的 Trigger,直到 Watermark 时间戳等于窗口的结束时间才会触发计算。

onElement 的逻辑是:当有元素加入到窗口时,先判断当前 Watermark 时间戳是否到达窗口的结束时间,如果到了就直接触发计算,否则注册一个时间事件,等待 Timer 触发窗口计算。

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }
    。。。。。。
}

2、ProcessingTimeTrigger

和 EventTimeTrigger 差不多,区别是采用的处理时间语义,没有 Watermark 相关的判断,直接注册ProcessingTime 事件等待窗口触发计算。

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
        return TriggerResult.FIRE;
    }
}

3、DeltaTrigger

DeltaTrigger 会计算新加入窗口的元素和上一个元素的差值,当这个差值超过给定的阈值时,窗口就会触发计算,元素之间的差值是通过指定的 DeltaFunction 计算出来的。

public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;
    private final DeltaFunction<T> deltaFunction;
    private final double threshold;
    private final ValueStateDescriptor<T> stateDesc;

    private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        this.deltaFunction = deltaFunction;
        this.threshold = threshold;
        this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);
        if (lastElementState.value() == null) {
            lastElementState.update(element);
            return TriggerResult.CONTINUE;
        } else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
            lastElementState.update(element);
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }
}
上一篇:AUTOSAR_EXP_ARAComAPI的5章笔记(13)


下一篇:第二十一篇——公理体系:几何的系统理论从何而来?-三、过程