Flink 内置了许多常用的 Trigger,大多数情况下它们足以支撑我们的业务场景,只有当内置的Trigger不符合要求时,才需要开发自定义的 Trigger。
1、EventTimeTrigger
和事件时间窗口搭配使用的 Trigger,直到 Watermark 时间戳等于窗口的结束时间才会触发计算。
onElement 的逻辑是:当有元素加入到窗口时,先判断当前 Watermark 时间戳是否到达窗口的结束时间,如果到了就直接触发计算,否则注册一个时间事件,等待 Timer 触发窗口计算。
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {
}
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
。。。。。。
}
2、ProcessingTimeTrigger
和 EventTimeTrigger 差不多,区别是采用的处理时间语义,没有 Watermark 相关的判断,直接注册ProcessingTime 事件等待窗口触发计算。
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {
}
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
return TriggerResult.FIRE;
}
}
3、DeltaTrigger
DeltaTrigger 会计算新加入窗口的元素和上一个元素的差值,当这个差值超过给定的阈值时,窗口就会触发计算,元素之间的差值是通过指定的 DeltaFunction 计算出来的。
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private final DeltaFunction<T> deltaFunction;
private final double threshold;
private final ValueStateDescriptor<T> stateDesc;
private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
this.deltaFunction = deltaFunction;
this.threshold = threshold;
this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);
}
public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);
if (lastElementState.value() == null) {
lastElementState.update(element);
return TriggerResult.CONTINUE;
} else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
lastElementState.update(element);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
}