Flink Sink到File(文件)

知识点

表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库和消息队列。 

具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入 注册过的 TableSink 中。同时表的输出跟更新模式有关

更新模式(Update Mode)
    对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。
    Flink Table API 中的更新模式有以下三种:
    
    1)追加模式(Append Mode) 
        在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。 
    2)撤回模式(Retract Mode) 
        在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。  
        插入(Insert)会被编码为添加消息;  
        删除(Delete)则编码为撤回消息;  
        更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。 
        在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。 
    3)Upsert(更新插入)模式 
        在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 
        这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息外部连接器需要知道这个唯一 key 的属性。 
        插入(Insert)和更新(Update)都被编码为 Upsert 消息;  
        删除(Delete)编码为 Delete 信息。 
        这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。

 

1、代码案例

package guigu.table.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

/**
 * @program: demo
 * @description: ${description}
 * @author: yang
 * @create: 2021-01-14 18:48
 */
object FileSink {
  def main(args: Array[String]): Unit = {

    //1、环境准备
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取数据,创建表视图
    val inputFile = "E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"
    tableEnv.connect(new FileSystem().path(inputFile))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("temperature",DataTypes.DOUBLE())
        .field("timestamp",DataTypes.BIGINT())
      )
      .createTemporaryTable("inputTable")

    //3、table api转换
    val tableApi: Table = tableEnv.from("inputTable")
    val apiResult: Table = tableApi.select("id,temperature").where("id = 'sensor_1'")
    val sqlResult: Table = tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_1'")
    //字符串模板
    val sqlModelResult: Table = tableEnv.sqlQuery(
      """
        |select id,temperature
        |from inputTable
        |where id = 'sensor_1'
      """.stripMargin)

    //4、创建输出表视图
    val outputFile = "E:\\java\\demo\\src\\main\\resources\\file\\outputFile.csv"
    tableEnv.connect(new FileSystem().path(outputFile))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("temperature",DataTypes.DOUBLE())
        )
      .createTemporaryTable("outputTable")

    //5、执行
    sqlModelResult.insertInto("outputTable")

    tableEnv.execute("Flink Sink Flie Test")

  }
}

 

上一篇:Flink-Sink(四)


下一篇:spdlog 循环日志文件