滚动聚合算子
常见的滚动聚合算子
sum,min,maxminBy,maxBy
作用 KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
参数
- 如果流中存储的是POJO或者scala的样例类, 参数使用字段名
- 如果流中存储的是元组, 参数就是位置(基于0...).
实体类
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 水位 监控器 用于接收水位数据 * id 传感器编号 * ts 时间戳 * vc 水位 */ @Data @NoArgsConstructor @AllArgsConstructor public class WaterSensor { public String id; public long ts; public Integer vc; }
max
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.wdh01.bean.WaterSensor; public class Flink02_Transform_Max { public static void main(String[] args) throws Exception { //1、获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2、读取端口数据 & 转换为 javaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { final String[] split = value.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])); } }); //3、按照传感器ID分组 KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); //4、计算最高水位线 SingleOutputStreamOperator<WaterSensor> vc = keyByStream.max("vc"); //5、打印 vc.print(); //6、执行 env.execute(); } }View Code
maxBy
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.wdh01.bean.WaterSensor; public class Flink03_Transform_MaxBy { public static void main(String[] args) throws Exception { //1、获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2、读取端口数据 & 转换为 javaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998). map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { final String[] split = value.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])); } }); //3、按照传感器ID分组 KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); //4、计算最高水位线 SingleOutputStreamOperator<WaterSensor> vc = keyByStream.maxBy("vc", false); // false 当两组值 数据一样时,不使用第一条,使用最新的数据 //5、打印 vc.print(); //6、执行 env.execute(); } }View Code
reduce
作用 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果.为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值! 参数 interface ReduceFunction<T> 返回 KeyedStream -> SingleOutputStreamOperator示例
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.wdh01.bean.WaterSensor; import static java.lang.Integer.max; public class Flink04_Transform_reduce { public static void main(String[] args) throws Exception { //1、获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2、读取端口数据 & 转换为 javaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998). map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { final String[] split = value.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])); } }); //3、按照传感器ID分组 KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); //4、计算最高水位线 SingleOutputStreamOperator<WaterSensor> reduce = keyByStream.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { return new WaterSensor(value1.getId(), value2.getTs(), max(value1.getVc(), value2.getVc())); } }); //5、打印 reduce.print(); //6、执行 env.execute(); } }View Code 说明:聚合后结果的类型, 必须和原来流中元素的类型保持一致!
process
作用 process算子在Flink算是一个比较底层的算子,很多类型的流上都可以调用,可以从流中获取更多的信息(不仅仅数据本身)
示例
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; public class Flink05_Transform_process { public static void main(String[] args) throws Exception { //1、获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2、读取端口数据 & 转换为 javaBean DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop103", 9998); //3、使用 process 实现 压平 SingleOutputStreamOperator<String> wordDS = socketTextStream.process(new ProcessFlatMapFunction()); //4、使用 process 实现 map SingleOutputStreamOperator<Tuple2<String, Integer>> wordToOne = wordDS.process(new ProcessMapFunction()); //5、按照单词分组 KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordToOne.keyBy(d -> d.f0); //6、计算 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1); //7、打印 sum.print(); //8、执行 env.execute(); } public static class ProcessMapFunction extends ProcessFunction<String, Tuple2<String, Integer>> { @Override public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { out.collect(new Tuple2<>(value, 1)); } } public static class ProcessFlatMapFunction extends ProcessFunction<String, String> { @Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(word); } //运行时上下文 RuntimeContext runtimeContext = getRuntimeContext(); //定时器 TimerService timerService = ctx.timerService(); timerService.registerProcessingTimeTimer(1245L); //当前处理数据的时间、 timerService.currentProcessingTime(); //事件时间 timerService.currentWatermark(); //侧输出流 ctx.output(); } //生命周期 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } } }View Code
数据流重新分区算子
- KeyBy 先按照key分组, 按照key的双重hash来选择后面的分区
- shuffle 对流中的元素随机分区
- reblance 对流中的元素平均分布到每个区.当处理倾斜数据的时候, 进行性能优化
- rescale 同 rebalance一样, 也是平均循环的分布数据。但是要比rebalance更高效, 因为rescale不需要通过网络, 完全走的"管道"。
重分区示例
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; //重分区 public class Flink06_Transform_Repartition { public static void main(String[] args) throws Exception { //1、获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2、读取端口数据 & 转换为 javaBean DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop103", 9998); //3、使用不同的重分区策略 socketTextStream.keyBy(d -> d).print("keyBy---"); socketTextStream.shuffle().print("shuffle---"); socketTextStream.rebalance().print("rebalance---"); socketTextStream.rescale().print("rescale---"); socketTextStream.global().print("global---"); // socketTextStream.broadcast().print("broadcast---"); // socketTextStream.forward().print("forward---"); //4、开启 env.execute(); } }View Code