声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。
AggregateFunction(主要用于增量计算)
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
// API
// IN: 输入元素类型
// ACC: 累加器类型
// OUT: 输出元素类型
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
// 初始化累加器 创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
ACC createAccumulator();
// 累加 对于数据的每条数据,和迭代数据的聚合的具体实现
ACC add(IN value, ACC accumulator);
// 累加器合并 合并两个累加器,返回一个具有合并状态的累加器
ACC merge(ACC a, ACC b);
// 输出 从累加器获取聚合的结果
OUT getResult(ACC accumulator);
}
实例一
// 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction)
kafkaStream
// 将从Kafka获取的JSON数据解析成Java Bean
.process(new KafkaProcessFunction())
// 提取时间戳生成水印
.assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
// 按用户分组
.keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
// 构造TimeWindow
.timeWindow(Time.seconds(windowLengthSeconds))
// 窗口函数: 获取这段窗口时间内,每个用户浏览的平均价值
.aggregate(new AggregateFunction<UserActionLog, Tuple2<Long,Long>, Double>() {
// 1、初始值
// 定义累加器初始值
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L,0L);
}
// 2、累加
// 定义累加器如何基于输入数据进行累加
@Override
public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
accumulator.f0 += 1;
accumulator.f1 += value.getProductPrice();
return accumulator;
}
// 3、合并
// 定义累加器如何和State中的累加器进行合并
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
acc1.f0+=acc2.f0;
acc1.f1+=acc2.f1;
return acc1;
}
// 4、输出
// 定义如何输出数据
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f1 / (accumulator.f0 * 1.0);
}
})
.print();
#结果
20.0
10.0
30.0
25.0
20.0
实例二
val avgTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.aggregate(new AvgTempFunction)
// An AggregateFunction to compute the average temperature per sensor.
// The accumulator holds the sum of temperatures and an event count.
class AvgTempFunction
extends AggregateFunction[(String, Double),
(String, Double, Int), (String, Double)] {
override def createAccumulator() = {
("", 0.0, 0)
}
override def add(in: (String, Double), acc: (String, Double, Int)) = {
(in._1, in._2 + acc._2, 1 + acc._3)
}
override def getResult(acc: (String, Double, Int)) = {
(acc._1, acc._2 / acc._3)
}
override def merge(acc1: (String, Double, Int),
acc2: (String, Double, Int)) = {
(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
}
实例三
一,概述
Flink 的AggregateFunction是一个基于中间计算结果状态进行增量计算的函数。由于是迭代计算方式,所以,在窗口处理过程中,不用缓存整个窗口的数据,所以效率执行比较高。
二,AggregateFunction接口类
AggregateFunction比ReduceFunction更加通用,它有三个参数:输入类型(IN),累加器类型(ACC)和输出类型(OUT)。
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
...............................
}
自定义聚合函数需要实现AggregateFunction接口类,它有四个接口实现方法:
a.创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
ACC createAccumulator();
b.对于数据的每条数据,和迭代数据的聚合的具体实现
ACC add(IN value, ACC accumulator);
c.合并两个累加器,返回一个具有合并状态的累加器
ACC merge(ACC a, ACC b);
d.从累加器获取聚合的结果
OUT getResult(ACC accumulator);
三、代码实例
1.模拟场景:
从文件/socket读取数据,数据包含三个字段:商品ID,用户ID,访问类型(1.点击查看 2.收藏 3.购买),访问时间;这里每隔3秒对最近6秒内的数据进行汇总计算各个商品的“点击查看”访问量,也就是访问类型为1的数据。
这里自定义聚合函数MyCountAggregate数据进行预聚合,自定义窗口函数MyCountWindowFunction2对聚合的数据封装成字符串,并加上窗口结束时间信息进行输出。
2.数据准备:
product1,user14,1,1586855115084 product2,user19,2,1586855116087 product2,user19,1,1586855116087 product3,user17,1,1586855117089 product1,user17,1,1586855118092 product2,user17,1,1586855119095 product3,user15,1,1586855120097 product1,user12,1,1586855121100 product2,user13,1,1586855122102 product3,user13,1,1586855123105 product1,user13,1,1586855124108 product2,user19,3,1586855116087 product2,user16,1,1586855125111 product1,user17,1,1586855136113 product1,user14,1,1586855127116 product2,user16,1,1586855128119 product2,user16,1,1586855129122 product3,user16,1,1586855130125 product2,user11,1,1586855131128 product1,user16,1,1586855132131 product2,user13,1,1586855133134 product3,user16,1,1586855134137 product3,user13,1,1586855135139 product2,user19,3,1586855116087 product1,user18,1,1586855136142 product2,user12,1,1586855137145 product1,user13,1,1586855138148 product3,user17,1,1586855139150
3.自定义聚合函数MyCountAggregate
package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.api.common.functions.AggregateFunction;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-15 22:00
* @version: v1.0
* @description: com.hadoop.ljs.flink110.aggreagate
* 输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
*/
public class MyCountAggregate implements AggregateFunction<ProductViewData, Long, Long> {
@Override
public Long createAccumulator() {
/*访问量初始化为0*/
return 0L;
}
@Override
public Long add(ProductViewData value, Long accumulator) {
/*访问量直接+1 即可*/
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
/*合并两个统计量*/
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
4.自定义窗口函数
package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-15 21:56
* @version: v1.0
* @description: com.hadoop.ljs.flink110.aggreagate
* *自定义窗口函数,封装成字符串
* *第一个参数是上面MyCountAggregate的输出,就是商品的访问量统计
* * 第二个参数 输出 这里为了演示 简单输出字符串
* * 第三个就是 窗口类 能获取窗口结束时间
*/
public class MyCountWindowFunction2 implements WindowFunction<Long,String,String, TimeWindow> {
@Override
public void apply(String productId, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
/*商品访问统计输出*/
/*out.collect("productId"productId,window.getEnd(),input.iterator().next()));*/
out.collect("----------------窗口时间:"+window.getEnd());
out.collect("商品ID: "+productId+" 浏览量: "+input.iterator().next());
}
}
5.主函数,代码如下:
package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-14 11:28
* @version: v1.0
* @description: com.hadoop.ljs.flink110.aggreagate
* 自定义聚合函数类和窗口类,进行商品访问量的统计,根据滑动时间窗口,按照访问量排序输出
*/
public class AggregateFunctionMain2 {
public static int windowSize=6000;/*滑动窗口大小*/
public static int windowSlider=3000;/*滑动窗口滑动间隔*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
/*DataStream<String> sourceData = senv.socketTextStream("localhost",9000);*/
//从文件读取数据,也可以从socket读取数据
DataStream<String> sourceData = senv.readTextFile("D:\\projectData\\ProductViewData2.txt");
DataStream<ProductViewData> productViewData = sourceData.map(new MapFunction<String, ProductViewData>() {
@Override
public ProductViewData map(String value) throws Exception {
String[] record = value.split(",");
return new ProductViewData(record[0], record[1], Long.valueOf(record[2]), Long.valueOf(record[3]));
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ProductViewData>(){
@Override
public long extractAscendingTimestamp(ProductViewData element) {
return element.timestamp;
}
});
/*过滤操作类型为1 点击查看的操作*/
DataStream<String> productViewCount = productViewData.filter(new FilterFunction<ProductViewData>() {
@Override
public boolean filter(ProductViewData value) throws Exception {
if(value.operationType==1){
return true;
}
return false;
}
}).keyBy(new KeySelector<ProductViewData, String>() {
@Override
public String getKey(ProductViewData value) throws Exception {
return value.productId;
}
//时间窗口 6秒 滑动间隔3秒
}).timeWindow(Time.milliseconds(windowSize), Time.milliseconds(windowSlider))
/*这里按照窗口进行聚合*/
.aggregate(new MyCountAggregate(), new MyCountWindowFunction2());
//聚合结果输出
productViewCount.print();
senv.execute("AggregateFunctionMain2");
}
}