1. Flink 设置watermak
这里说下这个时间时间的取值,本来我kafka的数据是clickhouse 查询时间特意处理成时间戳。然后使用 TO_TIMESTAMP(date_time) 来设置watermark。 阿里云官网 blink 是支持的,但是这个实际中却不支持。
真的有点狗了。。。。
解决办法如下写法。
public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE tableName (\n" +
"`date_time` BIGINT ,\n" +
"`hs_security_id` VARCHAR ,\n" +
"`security_id` VARCHAR ,\n" +
"`pre_close_px` DECIMAL,\n" +
"`open_px` DECIMAL,\n" +
"`high_px` DECIMAL ,\n" +
"`low_px` DECIMAL,\n" +
"`last_px` DECIMAL,\n" +
"`num_trades` DECIMAL,\n" +
"`volume` BIGINT,\n" +
"`amount` DECIMAL,\n" +
"`phase_code` BIGINT,\n" +
"bid_price VARCHAR,\n" +
"bid_qty VARCHAR,\n" +
"offer_price VARCHAR,\n" +
"offer_qty VARCHAR,\n" +
"ts AS TO_TIMESTAMP(FROM_UNIXTIME(date_time / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
" WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +
")WITH (\n" +
" 'connector' = 'kafka', \n" +
" 'topic'='xxx',\n" +
" 'properties.bootstrap.servers' = 'xxx.xxx.xx.xx:9092', \n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
"'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'" +
")";
2.设置开窗
public class OfflineDataAggregationTableApi implements Serializable {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String sourceDDL = CustomTable.SOURCE_KAFKA_SNAPSHOT;
// String sinkDDL = CustomTable.SNAPSHOT_PRINT;
//注册source和sink
tableEnv.executeSql(sourceDDL);
// tableEnv.executeSql(sinkDDL);
Table sourceTable = tableEnv.from("snapshot");
Table timeTable = tableEnv.sqlQuery("select \n" +
"TUMBLE_START(ts, INTERVAL '15' SECOND), \n" +
" hs_security_id,\n" +
" security_id,\n" +
" MAX(pre_close_px) as pre_close_px, \n" +
" MAX(open_px) as open_px, \n" +
" MAX(high_px) as high_px, \n" +
" FIRST_VALUE(phase_code) as phase_code, \n" +
" FIRST_VALUE(bid_price) as bid_price, \n" +
" FIRST_VALUE(bid_qty) as bid_qty, \n" +
" FIRST_VALUE(offer_price) as offer_price, \n" +
" FIRST_VALUE(offer_qty) as offer_qty \n" +
" from " +
sourceTable
+ " group by TUMBLE(ts, INTERVAL '15' SECOND),hs_security_id,security_id");
TableResult tableResult = tableEnv.executeSql(" select * from " + timeTable);
tableResult.print();
env.execute("快照数据读取");
}
}