FlinkSql指定时间语义

FlinkSql指定时间语义

FlinkSql在建表时指定时间语义,根据建表方式和时间语义的不同进行记录

1.从DataStream流建表+process time语义

因为是process time所以不需要指定watermark的延迟生成时间,故可以直接在创建table对象时最后一列增加一个字段即可

  • 举例
  public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
        DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
            String[] split = el.split(",");
            return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
        });
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //pt就是我们要增加的process time字段 名字可以任意命名
        Table tableApi = tableEnv.fromDataStream(mapDataStream, "id,temperature as temp,timestamp,pt.proctime");
        tableApi.printSchema();
        tableEnv.toAppendStream(tableApi, Row.class).print("api");
        env.execute();
    }
  • 此时打印表的Schema可以看到表最后增加了一列
root
 |-- id: STRING
 |-- temp: DOUBLE
 |-- timestamp: BIGINT
 |-- pt: TIMESTAMP(3) *PROCTIME*

看看

FlinkSql指定时间语义

上一篇:MySQL查询


下一篇:Oracle问题集合