FlinkSql指定时间语义
FlinkSql在建表时指定时间语义,根据建表方式和时间语义的不同进行记录
1.从DataStream流建表+process time语义
因为是process time所以不需要指定watermark的延迟生成时间,故可以直接在创建table对象时最后一列增加一个字段即可
- 举例
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
String[] split = el.split(",");
return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//pt就是我们要增加的process time字段 名字可以任意命名
Table tableApi = tableEnv.fromDataStream(mapDataStream, "id,temperature as temp,timestamp,pt.proctime");
tableApi.printSchema();
tableEnv.toAppendStream(tableApi, Row.class).print("api");
env.execute();
}
- 此时打印表的
Schema
可以看到表最后增加了一列
root
|-- id: STRING
|-- temp: DOUBLE
|-- timestamp: BIGINT
|-- pt: TIMESTAMP(3) *PROCTIME*
看看