知识点
表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库和消息队列。 具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入 注册过的 TableSink 中。同时表的输出跟更新模式有关 更新模式(Update Mode) 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。 Flink Table API 中的更新模式有以下三种: 1)追加模式(Append Mode) 在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。 2)撤回模式(Retract Mode) 在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。 插入(Insert)会被编码为添加消息; 删除(Delete)则编码为撤回消息; 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。 在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。 3)Upsert(更新插入)模式 在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息外部连接器需要知道这个唯一 key 的属性。 插入(Insert)和更新(Update)都被编码为 Upsert 消息; 删除(Delete)编码为 Delete 信息。 这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。
1、代码案例
package guigu.table.sink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @program: demo * @description: ${description} * @author: yang * @create: 2021-01-14 18:48 */ object FileSink { def main(args: Array[String]): Unit = { //1、环境准备 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2、读取数据,创建表视图 val inputFile = "E:\\java\\demo\\src\\main\\resources\\file\\data5.csv" tableEnv.connect(new FileSystem().path(inputFile)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("temperature",DataTypes.DOUBLE()) .field("timestamp",DataTypes.BIGINT()) ) .createTemporaryTable("inputTable") //3、table api转换 val tableApi: Table = tableEnv.from("inputTable") val apiResult: Table = tableApi.select("id,temperature").where("id = 'sensor_1'") val sqlResult: Table = tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_1'") //字符串模板 val sqlModelResult: Table = tableEnv.sqlQuery( """ |select id,temperature |from inputTable |where id = 'sensor_1' """.stripMargin) //4、创建输出表视图 val outputFile = "E:\\java\\demo\\src\\main\\resources\\file\\outputFile.csv" tableEnv.connect(new FileSystem().path(outputFile)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("outputTable") //5、执行 sqlModelResult.insertInto("outputTable") tableEnv.execute("Flink Sink Flie Test") } }