关于流处理框架Flink的入门使用

1、什么是flink

flink是一种流处理框架,通常使用场景是消费kafka数据进行分组聚合后发送到其他系统,分组与聚合是flink的核心,在本文中仅阐述单个使用场景。流数据相当于是连续不断的数据,生产上的kafka中的日志数据就可以理解为流数据,流数据还分为有界流和*流,有界即文本数据作为datastream这种有固定大小的数据,*即源源不断的数据。

2、flink的界面

下图为flink的界面,在界面中可以提交代码jar包,即可实时运行处理
关于流处理框架Flink的入门使用
关于流处理框架Flink的入门使用

3、flink结合代码案例讲解使用场景

在main入口函数中定义以下方法

//获取流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //获取数据流
        DataStream<String> stringDataStreamSource = env.socketTextStream("127.0.0.1", 6666);


        //转pojo
        SingleOutputStreamOperator<KafkaEntity> map = stringDataStreamSource.map(new MapFunction<String, KafkaEntity>() {
            @Override
            public KafkaEntity map(String value) throws Exception {


                KafkaEntity kafkaEntity = new KafkaEntity();
                if (!"".equals(value)){
                    String[] splitResult = value.split("1");
                    kafkaEntity.setCityId(splitResult[0]);
                    kafkaEntity.setAppId(splitResult[1]);
                    kafkaEntity.setProcessCode(splitResult[2]);
                    kafkaEntity.setStartTime(splitResult[3].substring(0,12));
                    kafkaEntity.setErrCode(splitResult[4]);
                }
                return kafkaEntity;
            }
        });

        //分组,聚合
        SingleOutputStreamOperator<Object> applyResult = map.keyBy("processCode", "appId", "cityId", "startTime")
                .timeWindow(Time.seconds(15))//每隔15秒聚合一次
                .apply(new WindowFunction<KafkaEntity, Object, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<KafkaEntity> input, Collector<Object> out) throws Exception {
                        //调用总次数
                        KafkaEntity aggregateResult = input.iterator().next();
                        int reqAmount = IteratorUtils.toList(input.iterator()).size();


                        //成功次数
                        int successAmount = 0;
                        //总时长
                        long timeAll = 0;
                        //限流次数
                        int failAmount = 0;
                        List<KafkaEntity> list = IteratorUtils.toList(input.iterator());
                        for (int i = 0; i < list.size(); i++) {
                            KafkaEntity kafkaEntity = list.get(i);
                            timeAll += Long.parseLong(kafkaEntity.getDuration());
                            if ("0".equals(kafkaEntity.getErrCode())) {
                                successAmount += 1;
                            } else {
                                failAmount += 1;
                            }
                        }

                        //平均调用时长
                        long averageDuration = (timeAll / reqAmount);


                        //聚合结果
                        aggregateResult.setReqAmount(String.valueOf(reqAmount));
                        aggregateResult.setSuccessAmount(String.valueOf(successAmount));
                        aggregateResult.setAverageDuration(String.valueOf(averageDuration));
                        aggregateResult.setFailAmount(String.valueOf(failAmount));
                        aggregateResult.setInsertTime(new Date());
                        out.collect(aggregateResult);
                    }
                });

        applyResult.addSink(new RichSinkOperation());

        env.execute();
        

4、代码解释

4.1

首先需要获取流环境

4.2

以socket文本流代替kafka消费者,在linux中使用nc -lk 6666 启动,然后写文本发送即可模拟kafka消费者读取数据,这里也是通过第一步的流环境来获取数据流

4.3

获取到数据流后,将datastream通过map方法(这也可以当作一种算子)转为pojo类,到此,数据准备完成

4.4

SingleOutputStreamOperator也是datastream的子类,我们将获取到的pojo流通过keyby分组,分组的维度是四个,即"processCode", “appId”, “cityId”, “startTime”,只要收到的数据中有一个元素与上一个不同,即为新的一个组

4.5

分组以后通过timewindow设置窗口大小为15秒,即15秒进行一次聚合,聚合方法为下面的apply

4.6

apply方法是对15秒内收到的数据根据用户自定义来做数据处理
KafkaEntity aggregateResult = input.iterator().next();代表按那四个维度来分组得到的pojo对象,同一组中那四个属性都是一样的,在本例中由此来计算同一组的总次数即按当前维度分组后,每组的数据个数,即list的大小,重新计算后放入pojo的一个属性中,最终通过out.collect方法将计算得到的结果汇总在一个对象的几个属性中输出

4.7

applyResult为聚合后的结果,最后一步为将聚合结果输出到外部系统,这里举例为入数据库(redis或hbase都一样)

4.8

public class RichSinkOperation extends RichSinkFunction {


    @Override
    public void invoke(Object value) throws Exception {



        InputStream inputStream = Resources.getResourceAsStream("mybatis-config.xml");
        //获取工厂
        SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(inputStream);

        SqlSession sqlSession = factory.openSession();


        FlinkDao flinkDao  = sqlSession.getMapper(FlinkDao .class);

        KafkaEntity kafkaEntity = (KafkaEntity) value;



        flinkDao.insertRecord(kafkaEntity);
        

        sqlSession.commit();
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        
    }
    
    
    
}

此处集成了mybatis,该自定义类继承RichSinkFunction,主要实现invoke方法,将聚合结果的每一条进行入库处理

本例代码仅为很局限的场景使用,仅为打通整体流程,需要根据业务不同定义不同的apply处理办法,此处的sink操作中也不合理,生产中数据库连接应该放在open中并使用数据池,另外还需要考虑生产每分钟都是上亿的数据,如果开一分钟的窗口,聚合结果都在内存中内存会不会炸,聚合后一次性sink数据库操作会不会阻塞,需要压测来得到实际效果验证。

上一篇:MinIO 单机版安装使用+Flink使用MinIO状态存储


下一篇:数仓开发那些事(4)