flink自定义trigger-实现窗口随意输出

flink自定义trigger-实现窗口随意输出

浪尖 浪尖聊大数据

前面,一篇简单讲了flink的窗口及与Spark Streaming窗口之间的对比。
对于flink的窗口操作,尤其是基于事件时间的窗口操作,大家还要掌三个个重要的知识点:

  1. 窗口分配器:就是决定着流入flink的数据,该属于哪个窗口。

  2. 时间戳抽取器/watermark生成器:抽取时间戳并驱动着程序正常执行。

  3. trigger:决定着数据啥时候落地。

flink有很多内置的触发器,对于基于事件时间的窗口触发器叫做
EventTimeTrigger。其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现。
那么可能你没留意前面说的,为啥需要trigger,因为没有trigger的话,存在允许事件滞后的时候,输出时间延迟比较大,而我们需要尽早看到数据,那么这个时候就可以自己定义窗口触发。
自定义触发器
修改自基于处理时间的触发器,源码如下:


package org.trigger;

import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class CustomProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private CustomProcessingTimeTrigger() {}

private static int flag = 0;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
// CONTINUE是代表不做输出,也即是,此时我们想要实现比如100条输出一次,
// 而不是窗口结束再输出就可以在这里实现。
if(flag > 9){
flag = 0;
return TriggerResult.FIRE;
}else{
flag++;
}
System.out.println("onElement : "+element);
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}

@Override
public String toString() {
return "ProcessingTimeTrigger()";
}

/**
* Creates a new trigger that fires once system time passes the end of the window.
*/
public static CustomProcessingTimeTrigger create() {
return new CustomProcessingTimeTrigger();
}

}

主要实现逻辑是在onElement函数,实现的逻辑是增加了每10个元素触发一次计算结果输出的逻辑。

主函数


package org.trigger;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Properties;

/*
trigger 测试
滚动窗口,20s
然后是trigger内部技术,10个元素输出一次。

*/
public class kafkaSourceTriggerTest {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9093");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>("test",
new SimpleStringSchema(),
properties);

AllWindowedStream<Integer, TimeWindow> stream = env
.addSource(kafkaConsumer010)
.map(new String2Integer())
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.seconds(20))
.trigger(CustomProcessingTimeTrigger.create());
stream.sum(0)
.print()
;

env.execute("Flink Streaming Java API Skeleton");
}
private static class String2Integer extends RichMapFunction<String, Integer> {
private static final long serialVersionUID = 1180234853172462378L;
@Override
public Integer map(String event) throws Exception {

return Integer.valueOf(event);
}
@Override
public void open(Configuration parameters) throws Exception {
}
}

}

代码测试,通过的哦。

上一篇:vue密码强度提示的两种方式


下一篇:制作一张简单的僵尸逃跑地图