在实际的生产开发过程中 Keyed Window 使用的最多,故 Keyed Window 算子很重要。
在每个窗口算子中包含了 Windows Assigner、Window Trigger(窗口触发器)、Evictor(数据剔除器)、Lateness(延时设置)、Output Tag(输出标签)以及 Windows Function 等组成部分,
其中 Windows Assigner 和 Windows Function 是所有窗口算子必须指定的属性,其余的属性都是根据实际情况选择
stream.keyBy(...) // 是Keyed类型数据集 .window(...) //指定窗口分配器类型 [.trigger(...)] //指定触发器类型(可选) [.evictor(...)] // 指 定 evictor 或 者 不 指 定 ( 可 选 ) [.allowedLateness(...)] //指定是否延迟处理数据(可选) [.sideOutputLateData(...)] //指定Output Lag(可选) .reduce/aggregate/fold/apply() //指定窗口计算函数 [.getSideOutput(...)] //根据Tag输出数据(可选)
※ Windows Assigner:指定窗口的类型,定义如何将数据流分配到一个或多个窗口;
※ Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;
※ Evictor:用于数据剔除;
※ AllowedLateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算;
※ Output Tag:标记输出标签,然后在通过 getSideOutput 将窗口中的数据根据标签输出;
※ Windows Function:定义窗口上数据处理的逻辑,例如:对数据进行 sum 求和操作;
一、窗口聚合函数
定义了 Windows Trigger 之后,下一步就可以定义窗口内数据的计算逻辑,这也是 Windows Function 的定义。Flink 中提供了四种类型的 Window Function,分别为 ReduceFunction、AffregateFunction 以及 ProcessWindowFunction,(sum和max)等。前三种类型的 Windows Function 按照计算原理不同可以分为2类:
※ 增量聚合函数:对应有 ReduceFunction,AffregateFunction;
※ 全量窗口函数 :对应有 ProcessWindowFunction(还有 WindowFunction)
增量聚合函数计算性能较高,占用存储空间少,主要因为中间状态的计算结果,窗口中只维护中间结果状态值,不需要缓存原始数据。
全量窗口函数使用的代价相对较高,性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发时,对所有的原始数据进行汇总计算。
1)ReduceFunction
ReduceFunction 定义了对输入的两个相同类型的数据元素按照指定的计算方法及逆行聚合操作,然后输出类型相同的一个结果元素。
//每隔5秒统计每个基站的日志数量 data.map(stationLog=>((stationLog.sid,1))) .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
2)AggregateFunction
和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction接口相对于
ReduceFunction 更加灵活,实现复杂度也相对较高。AggregateFunction 接口中定义了3个需要复写的方法,其中 add() 定义数据的添加逻辑,getResult() 定义了根据
createAccumulator() 计算结果的逻辑,merge() 方法定义合并 createAccumulator() 的逻辑
//每隔3秒计算最近5秒内,每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1))) .keyBy(_._1) .timeWindow(Time.seconds(5),Time.seconds(3)) .aggregate(new AggregateFunction[(String,Int),(String,Long),(String,Long)] { override def createAccumulator() = ("",0) override def add(in: (String, Int), acc: (String, Long)) = { (in._1,acc._2+in._2)} override def getResult(acc: (String, Long)) = acc override def merge(acc: (String, Long), acc1: (String, Long)) = { (acc._1,acc1._2+acc._2)} } )
3)ProcessWindowFunction
ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数,已经满足大多数的情况,但是某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的元素,或需要操作窗口中的状态数据和窗口元数据,就需要使用 ProcessWindowFunction,ProcessWindowFunction能够更灵活地支持基于窗口全部数据元素的计算结果,例如对整个窗口中的数据操作,获取TopN,就需要使用 ProcessWindowFunction
//每隔5秒统计每个基站的日志数量 data.map(stationLog=>((stationLog.sid,1))) .keyBy(_._1) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow] { override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { println(" ") out.collect((key,elements.size)) } }).print()