flink重温笔记(十六): flinkSQL 顶层 API ——实时数据流结合外部系统

Flink学习笔记

前言:今天是学习 flink 的第 16 天啦!学习了 flinkSQL 与企业级常用外部系统结合,主要是解决大数据领域数据计算后,写入到文件,kafka,还是mysql等 sink 的问题,即数据计算完后保存到哪里的问题!结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:"分享是快乐的源泉????,在我的博客里,不仅有知识的海洋????,还有满满的正能量加持????,快来和我一起分享这份快乐吧????!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Flink学习笔记
    • 二、FlinkSQL 连接外部系统
      • 1. 输出到文件
      • 2. 更新模式(Update Mode)
        • 2.1 追加模式(Append Mode)
        • 2.2 撤回模式(Retract Mode)
        • 2.3 更新插入模式(Upsert Mode)
      • 3. 写入到 Kafka
      • 4. 写入到 MySQL

二、FlinkSQL 连接外部系统

1. 输出到文件

例子:将表结果输出到文件系统中

package cn.itcast.day01.sink;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;


/**
 * @author lql
 * @time 2024-03-12 15:25:14
 * @description TODO
 */
public class FsSinkTest {
    public static void main(String[] args) throws Exception {
        // todo 1) 配置 table 环境
        // 1. 配置流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 配置设置环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        // 3. 配置表环境
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

        // todo 2) 从文件中读取数据
        String filePath = FsSinkTest.class.getClassLoader().getResource("order.csv").getPath();
        DataStreamSource<String> inputStream = env.readTextFile(filePath);

        // todo 3) 将表数据转化类型
        SingleOutputStreamOperator<OrderInfo> stream = inputStream.map(new MapFunction<String, OrderInfo>() {
            @Override
            public OrderInfo map(String data) throws Exception {
                String[] dataArrary = data.split(",");
                return new OrderInfo(
                        dataArrary[0],
                        dataArrary[1],
                        Double.parseDouble(dataArrary[2]),
                        dataArrary[3]);
            }
        });

        // todo 4) 将数据流转化为表
        Table table = bsTableEnv.fromDataStream(stream);

        // todo 5) 调用 api 方式
        Table result = table
                .select($("id"), $("timestamp"), $("money"), $("category"))
                .filter($("category").isEqual("电脑"));

        // todo 6) 将表转化为流打印
        bsTableEnv.toAppendStream(result, Row.class).print("结果数据>>>");

        // todo 7) 将查询的结果写入到文件中
        ConnectTableDescriptor connectTableDescriptor = bsTableEnv
                .connect(new FileSystem().path("D:\\IDEA_Project\\BigData_Java\\flinksql_pro\\data\\output\\order.txt"))
                .withFormat(new Csv())
                .withSchema(
                        new Schema()
                                .field("id", DataTypes.STRING())
                                .field("name", DataTypes.STRING())
                                .field("money", DataTypes.DOUBLE())
                                .field("category", DataTypes.STRING())
                );

        // todo 8) 将通过connect创建的输出文件注册为表对象
        connectTableDescriptor.createTemporaryTable("outputOrder");

        // todo 9) 将表查询的结果插入到临时表中
        table.executeInsert("outputOrder");

        // todo 10) 执行程序
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class OrderInfo {
        private String id;
        private String timestamp;
        private Double money;
        private String category;
    }
}

结果:在 output 目录下生成了一个 order.txt 的文件

总结:

  • 1- connect 方法:存储在哪里,存储什么地方,存储什么格式
  • 2- createTemporaryTable():创建临时表
  • 3- 将之前结果表插入到临时表中

2. 更新模式(Update Mode)

2.1 追加模式(Append Mode)
  • 表(动态表)和外部连接器只交换插入(Insert)消息。

2.2 撤回模式(Retract Mode)
  • 表和外部连接器交换的是添加(Add)和撤回(Retract)消息。
  • 应用场景:
    • 插入数据时,它会被编码为添加消息。
    • 删除数据时,它会被编码为撤回消息;
    • 更新数据时,会先发送一个已更新行的撤回消息,然后再发送一个更新行的添加消息。
  • 这种模式允许对表中的数据进行修改和删除,但需要注意的是,它不能定义key

2.3 更新插入模式(Upsert Mode)
  • 动态表和外部连接器交换 Upsert 和 Delete 消息。
  • 这种模式需要一个唯一的 key,通过这个 key 可以传递更新消息。
  • 应用场景:
    • 插入数据时,它会使用 Upsert 消息。
    • 删除数据时,它会使用 Delete 消息。
    • 更新数据时,它也会使用 Upsert 消息,并通过 key 来标识要更新的行。
  • 这种模式在效率上更高,因为它只需要发送一条消息即可完成更新操作。

案例演示:从 kafka 读取数据,实时聚合操作,撤回模式

package cn.itcast.day01.sink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;

/**
 * @author lql
 * @time 2024-03-12 17:33:03
 * @description TODO
 */
public class KafkaSinkTest {
    public static void main(String[] args) throws Exception {
        // todo 1) 初始化 flinkSQL 环境
        // 1.1 配置流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1.2 配置setting环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        // 1.3 配置表表环境
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env, bsSettings);

        // todo 2) 读取 kafka 数据源
        tbEnv.connect(
                new Kafka()
                .version("universal") // 指定 kafka 版本
                .topic("order") // 定义主题
                .property("bootstrap.servers","node1:9092")
        ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.STRING())
                        .field("money", DataTypes.DOUBLE())
                        .field("category", DataTypes.STRING())
                        .field("pt", DataTypes.TIMESTAMP(3))
                        // 使用 protime,指定字段名定义处理时间字段
                        // 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema
                        .proctime()
                ).createTemporaryTable("kafkaInputTable");

        // todo 3) 表的查询操作
        // 3.1 通过表环境获取到数据表:from
        Table orderTable = tbEnv.from("kafkaInputTable");

        // 3.2 将表转为 stream 后打印
        tbEnv.toAppendStream(orderTable, Row.class).print("Table API>>>>");

        // 3.2 调用 table api 进行聚合操作
        Table aggResultTable = orderTable.groupBy($("category"))
                .select($("category"),
                        $("money").sum().as("totalMoney"),
                        $("id").count().as("cnt")
                );

        tbEnv.toRetractStream(aggResultTable,Row.class).print("agg result:>>>");

        // todo 4) 启动程序,将数据写入到kafka的时候,可以不加execute代码
        env.execute();
    }
}

结果:显示 false 即撤回,显示 true 即添加

Table API>>>>> +I[user_001, 1621718199, 10.1, 电脑, 2024-03-12T10:42:18.335Z]
agg result:>>>> (true,+I[电脑, 10.1, 1])
Table API>>>>> +I[user_001, 1621718201, 14.1, 手机, 2024-03-12T10:42:33.626Z]
agg result:>>>> (true,+I[手机, 14.1, 1])
Table API>>>>> +I[user_002, 1621718202, 82.5, 手机, 2024-03-12T10:42:50.130Z]
agg result:>>>> (false,-U[手机, 14.1, 1])
agg result:>>>> (true,+U[手机, 96.6, 2])

总结:

  • 实时聚合操作结果不可以简单 toAppendStream 打印,需要使用更新模式toRetractStream
  • 这种聚合结果更新操作暂时不适合写入 kafka!

3. 写入到 Kafka

例子:将查询的结果数据写入到 kafka 中

package cn.itcast.day01.sink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;

/**
 * @author lql
 * @time 2024-03-12 19:29:56
 * @description TODO
 */
public class KafkaSinkTest1 {
    public static void main(String[] args) throws Exception {
        // Todo 1) 配置 flink SQL 环境
        // 1. 配置流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 配置 flink settings 环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        // 3. 配置表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        // Todo 2) kafka 数据源
        tableEnv.connect(new Kafka()
                .version("universal")
                .topic("order")
                .property("bootstrap.servers","node1:9092")
        ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.STRING())
                        .field("money", DataTypes.DOUBLE())
                        .field("category", DataTypes.STRING())
                        .field("pt", DataTypes.TIMESTAMP(3))
                        // 使用 protime,指定字段名定义处理时间字段
                        // 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema
                        .proctime()
                ).createTemporaryTable("kafkaInputTable");

        // Todo 3) table API方式提取数据
        // 3.1 通过表环境获取数据表
        Table ordertable = tableEnv.from("kafkaInputTable");
        tableEnv.toAppendStream(ordertable, Row.class).print("Table API>>>");

        // 3.2 编写逻辑提取数据
        Table tableResult = ordertable
                .select($("id"), $("timestamp"), $("money"), $("category"))
                .filter($("category"
上一篇:AcWing20:用两个栈实现队列


下一篇:3716. 命名法 北京师范大学考研机试题 模拟思想