Flink编程基本步骤和加载不同类型数据源

Flink编程基本步骤:

1.创建流执行环境 StreamExecutionEnviroment.getExecutionEnviroment() 获取流环境。

2.加载数据源 Source

3.转换操作 Transformation

4.输出出去Sink,落地到其它的数据仓库,直接打印输出.

关于Flink 数据的基本操作 —— 四种分类

  1. 单条数据的操作 map filter

  2. 多条数据的操作 window

  3. 多个流合并成一个流操作 connect union join

  4. 将一个流拆分成多个流操作 ,(split 过期),测输出流(OutputTag)output

Flink输入数据源 source

自带预定义Source

  • 基于本地集合Source

    • 应用场景,当程序写完之后,测试当前功能是否可用,开发测试用。

    • 分类

      1. 从元素 fromElements

      2. 从集合 fromCollection

      3. 基于Sequence的generateSequence

      4. 基于开始和结束的DataStream ,fromSequence

  • 并行度设置

    • 并行度设置方式

      1.设置配置文件 flink-conf.yaml  parallelism.default: 1
      2.在client端提交任务设置并行度  flink run -p 1
      3.在程序中设置全局并行度  env.setParallelism(1)
      4.算子级别的并行度设置  算子.setParallelism(1)
      优先级:
      算子优先级 > 程序中全局并行度 > client提交作业并行度 > 配置文件中的并行度
    • 全局并行度设置

      env.setParallelism(1);
    • 算子并行度设置

      source.print().setParallelism(2);
  • 基于文件的Source

    • 批的方式读取文件,只读取一次——readTextFile

    • 流的方式读取文件,实时根据指定周期去监控文件—— readFile

      监控方式 watchType 两种

      1. FileProcessingMode.PROCESS_CONTINUOUSLY 主要用于修改删除操作比较多场景,根据周期持续读取整个文件(破坏仅一次语义特性)

      2. FileProcessingMode.Process_Once 主要用于仅读取一次,读完就 exit

  • 基于Socket的Source

    env.socketTextStream("node1", 9999);

自定义Source

  • 主要用于指定特定格式的数据源,生成有界或*数据流

    1. 有界数据流 - 读取 mysql ,读取 文件

    2. *数据流 - for 循环生成数据

  • 需求 - 通过用户自定义的方式生成数据源。

  • 需求案例

    • 自定义实现SourceFunction接口案例 只需要重写 run方法和 cancel方法

package cn.itcast.flink.api;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * 每一秒钟生成一条数据
 * 打印输出每条数据
 * 执行流环境
 */
public class OrderSource {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度
        env.setParallelism(1);
        //3.获取自定义数据源
        //实现方式
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource());
        //4.打印输出
        source.printToErr();
        //5.执行流环境
        env.execute();
    }

    public static class OrderEmitSource implements SourceFunction<Order> {
        //定义一个标记,用于标识当前持续生成数据
        private volatile boolean isRunning = true;

        /**
         * 实现数据的生成,并将生成的数据输出 ctx 输出
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //定义随机数
            Random rm = new Random();
            //时间转换格式工具
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //死循环,一直生成数据
            while (isRunning) {
                //随机数
                String oid = UUID.randomUUID().toString();
                //用户id ,随机 0~5 之间值
                int uid = rm.nextInt(6);
                //money 0~100之间的
                double money = rm.nextDouble()*100;
                //时间戳
                long timestamp = System.currentTimeMillis();
                //当前时间
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //收集数据
                ctx.collect(order);
                //程序休眠一秒接着执行
                TimeUnit.SECONDS.sleep(1);
            }
        }

        /**
         * 用户取消生成的时候,取消生成
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}

 实现ParallelSourceFunction 接口案例

 并行化生成数据,算子上设置并行度 setParallelism(n)

package cn.itcast.flink.api;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * 每一秒钟生成一条数据
 * 打印输出每条数据
 * 执行流环境
 */
public class OrderParallelismSource {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度
        env.setParallelism(1);
        //3.获取自定义数据源
        //实现方式
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource()).setParallelism(6);
        //4.打印输出 
        source.printToErr();
        //5.执行流环境
        env.execute();
    }

    public static class OrderEmitSource implements ParallelSourceFunction<Order> {
        //定义一个标记,用于标识当前持续生成数据
        private volatile boolean isRunning = true;

        /**
         * 实现数据的生成,并将生成的数据输出 ctx 输出
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //定义随机数
            Random rm = new Random();
            //时间转换格式工具
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //死循环,一直生成数据
            while (isRunning) {
                //随机数
                String oid = UUID.randomUUID().toString();
                //用户id ,随机 0~5 之间值
                int uid = rm.nextInt(6);
                //money 0~100之间的
                double money = rm.nextDouble()*100;
                //时间戳
                long timestamp = System.currentTimeMillis();
                //当前时间
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //收集数据
                ctx.collect(order);
                //程序休眠一秒接着执行
                TimeUnit.SECONDS.sleep(5);
            }
        }

        /**
         * 用户取消生成的时候,取消生成
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}

实现RichParallelSourceFunction案例

  • Rich 是富函数继承了 AbstractRichFunciton,实现了

  • 生命周期的 open 和 close 方法

    1. open 方法,用于实现当前生成的初始化条件

    2. close 方法,用于生成数据结束的收尾工作

    3. getRuntimeContext 方法,用于获取当前的程序的上下文对象(参数、环境变量、状态、累加器等)

  • 案例 - 从数据库中读取数据

  • 1:初始化工作 —— 创建数据库和数据表

# 创建数据库
create database test;
# 使用数据库
use test;
# 创建表和导入数据
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');

SET FOREIGN_KEY_CHECKS = 1;

 2:Flink读取MySQL的数据源

package cn.itcast.flink.source;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2022/1/11 16:17
 * Desc 读取mysql数据表并打印输出
 * 开发步骤:
 * 1.创建和准备数据库和数据表  flink
 * 2.获取流执行环境
 * 3.设置并行度
 * 4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
 * 4.1. open 初始化动作,创建连接,创建 statement ,获取变量
 * 4.2. run方法 读取数据表中数据并封装成对象
 * 4.3. close方法 关闭statement和连接
 * 5. 打印结果输出
 * 6. 执行流环境
 */
public class UserSource {
    public static void main(String[] args) throws Exception {
        //2.获取流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.设置并行度
        env.setParallelism(1);
        //4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
        DataStreamSource<User> source = env.addSource(new RichSourceFunction<User>() {
            Connection conn = null;
            Statement statement = null;
            //标记
            boolean isRunning = true;

            /**
             * 在所有执行source,首先要做的初始化工作
             * @param parameters
             * @throws Exception
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                //1.设置 driver 驱动
                Class.forName("com.mysql.jdbc.Driver");
                //2.获取连接 设置 url 用户名 密码
                conn = DriverManager.getConnection(
                        "jdbc:mysql://node1:3306/flink?useSSL=false",
                        "root",
                        "123456"
                );
                //3.创建 statement 基于 sql
                statement = conn.createStatement();
            }

            /**
             * 所有的元素都在这里执行
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<User> ctx) throws Exception {
                String sql = "select id,username,password,name from user";
                while (isRunning) {
                    //1.读取数据 statement.executeQuery 得到 ResultSet 结果集
                    ResultSet rs = statement.executeQuery(sql);
                    //2.遍历 ResultSet 是否有数据 hasNext() = true
                    while (rs.next()) {
                        User user = new User();
                        //3.将每条数据 赋值 对象 User
                        int id = rs.getInt("id");
                        String username = rs.getString("username");
                        String password = rs.getString("password");
                        String name = rs.getString("name");

                        user.setId(id);
                        user.setUsername(username);
                        user.setPassword(password);
                        user.setName(name);
                        //4.将 User 收集 ctx.collect(user)
                        ctx.collect(user);
                    }
                    TimeUnit.MINUTES.sleep(5);
                }
            }

            @Override
            public void cancel() {
                //将flag置为 false
                isRunning = false;
            }

            /**
             * 所有的元素执行完毕的收尾工作
             * @throws Exception
             */
            @Override
            public void close() throws Exception {
                //关闭 statement
                if (!statement.isClosed()) {
                    statement.close();
                }
                //关闭 connection
                if (!conn.isClosed()) {
                    conn.close();
                }
            }
        });
        //4.1. open 初始化动作,创建连接,创建 statement ,获取变量
        //4.2. run方法 读取数据表中数据并封装成对象
        //4.3. close方法 关闭statement和连接
        //5. 打印结果输出
        source.printToErr();
        //6. 执行流环境
        env.execute();
    }

    public static class User {
        // id
        private int id;
        // username
        private String username;
        // password
        private String password;
        // name
        private String name;

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getUsername() {
            return username;
        }

        public void setUsername(String username) {
            this.username = username;
        }

        public String getPassword() {
            return password;
        }

        public void setPassword(String password) {
            this.password = password;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    ", password='" + password + '\'' +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
}

上一篇:flink 读取kafka之自定义DeserializationSchema序列化


下一篇:flink往mongo塞数据