Flink TransForm (二)

滚动聚合算子

常见的滚动聚合算子
sum,min,maxminBy,maxBy
作用 KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
参数

  1. 如果流中存储的是POJO或者scala的样例类, 参数使用字段名
  2. 如果流中存储的是元组, 参数就是位置(基于0...).
返回 KeyedStream -> SingleOutputStreamOperator

实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 水位 监控器 用于接收水位数据
 * id 传感器编号
 * ts 时间戳
 * vc 水位
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {

    public String id;
    public long ts;
    public Integer vc;
}

max

Flink TransForm (二)
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.wdh01.bean.WaterSensor;

public class Flink02_Transform_Max {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        final String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                });
        //3、按照传感器ID分组
        KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        //4、计算最高水位线
        SingleOutputStreamOperator<WaterSensor> vc = keyByStream.max("vc");
        //5、打印
        vc.print();
        //6、执行
        env.execute();
    }
}
View Code

maxBy

Flink TransForm (二)
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.wdh01.bean.WaterSensor;

public class Flink03_Transform_MaxBy {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998).
                map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        final String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                });
        //3、按照传感器ID分组
        KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        //4、计算最高水位线
        SingleOutputStreamOperator<WaterSensor> vc = keyByStream.maxBy("vc", false); // false 当两组值 数据一样时,不使用第一条,使用最新的数据
        //5、打印
        vc.print();
        //6、执行
        env.execute();
    }
}
View Code

reduce

作用    一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果.为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值! 参数 interface ReduceFunction<T> 返回 KeyedStream -> SingleOutputStreamOperator

示例

Flink TransForm (二)
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.wdh01.bean.WaterSensor;

import static java.lang.Integer.max;

public class Flink04_Transform_reduce {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998).
                map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        final String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                });
        //3、按照传感器ID分组
        KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        //4、计算最高水位线
        SingleOutputStreamOperator<WaterSensor> reduce = keyByStream.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                return new WaterSensor(value1.getId(), value2.getTs(), max(value1.getVc(), value2.getVc()));
            }
        });
        //5、打印
        reduce.print();
        //6、执行
        env.execute();
    }
}
View Code 说明:聚合后结果的类型, 必须和原来流中元素的类型保持一致!

process

作用  process算子在Flink算是一个比较底层的算子,很多类型的流上都可以调用,可以从流中获取更多的信息(不仅仅数据本身)

示例 

Flink TransForm (二)
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class Flink05_Transform_process {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop103", 9998);
        //3、使用 process 实现 压平
        SingleOutputStreamOperator<String> wordDS = socketTextStream.process(new ProcessFlatMapFunction());
        //4、使用 process 实现 map
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordToOne = wordDS.process(new ProcessMapFunction());
        //5、按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordToOne.keyBy(d -> d.f0);
        //6、计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
        //7、打印
        sum.print();
        //8、执行
        env.execute();
    }

    public static class ProcessMapFunction extends ProcessFunction<String, Tuple2<String, Integer>> {

        @Override
        public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
            out.collect(new Tuple2<>(value, 1));
        }
    }

    public static class ProcessFlatMapFunction extends ProcessFunction<String, String> {
        @Override
        public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
            String[] words = value.split(" ");
            for (String word : words) {
                out.collect(word);
            }
            //运行时上下文
            RuntimeContext runtimeContext = getRuntimeContext();

            //定时器
            TimerService timerService = ctx.timerService();
            timerService.registerProcessingTimeTimer(1245L);
            //当前处理数据的时间、
            timerService.currentProcessingTime();
            //事件时间
            timerService.currentWatermark();
            //侧输出流   ctx.output();

        }

        //生命周期
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
        }


    }
}
View Code

数据流重新分区算子 

  • KeyBy 先按照key分组, 按照key的双重hash来选择后面的分区
  • shuffle 对流中的元素随机分区
  • reblance 对流中的元素平均分布到每个区.当处理倾斜数据的时候, 进行性能优化
  • rescale 同 rebalance一样, 也是平均循环的分布数据。但是要比rebalance更高效, 因为rescale不需要通过网络, 完全走的"管道"。

 Flink TransForm (二)

 

 重分区示例

Flink TransForm (二)
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

//重分区
public class Flink06_Transform_Repartition {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop103", 9998);
        //3、使用不同的重分区策略
        socketTextStream.keyBy(d -> d).print("keyBy---");
        socketTextStream.shuffle().print("shuffle---");
        socketTextStream.rebalance().print("rebalance---");
        socketTextStream.rescale().print("rescale---");
        socketTextStream.global().print("global---");
        // socketTextStream.broadcast().print("broadcast---");
        // socketTextStream.forward().print("forward---");

        //4、开启
        env.execute();
    }
}
View Code

 

 

上一篇:使用uview-ui遇见SassError: Undefined variable


下一篇:vue2+webpack 支持 vite