Flink--Table Api 和 sql 之 watermark开窗间属性(二)

1. Flink 设置watermak

这里说下这个时间时间的取值,本来我kafka的数据是clickhouse 查询时间特意处理成时间戳。然后使用 TO_TIMESTAMP(date_time) 来设置watermark。 阿里云官网 blink 是支持的,但是这个实际中却不支持。
真的有点狗了。。。。

解决办法如下写法。

 public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE  tableName (\n" +
            "`date_time` BIGINT ,\n" +
            "`hs_security_id` VARCHAR ,\n" +
            "`security_id` VARCHAR ,\n" +
            "`pre_close_px` DECIMAL,\n" +
            "`open_px` DECIMAL,\n" +
            "`high_px` DECIMAL ,\n" +
            "`low_px` DECIMAL,\n" +
            "`last_px` DECIMAL,\n" +
            "`num_trades` DECIMAL,\n" +
            "`volume` BIGINT,\n" +
            "`amount` DECIMAL,\n" +
            "`phase_code` BIGINT,\n" +
            "bid_price VARCHAR,\n" +
            "bid_qty VARCHAR,\n" +
            "offer_price VARCHAR,\n" +
            "offer_qty VARCHAR,\n" +
            "ts AS TO_TIMESTAMP(FROM_UNIXTIME(date_time / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
            " WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +
            ")WITH (\n" +
            "  'connector' = 'kafka', \n" +
            "  'topic'='xxx',\n" +
            "  'properties.bootstrap.servers' = 'xxx.xxx.xx.xx:9092', \n" +
            "  'format' = 'json',\n" +
            "  'scan.startup.mode' = 'earliest-offset',\n" +
            "'json.fail-on-missing-field' = 'false',\n" +
            " 'json.ignore-parse-errors' = 'true'" +
            ")";

2.设置开窗

public class OfflineDataAggregationTableApi implements Serializable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        String sourceDDL = CustomTable.SOURCE_KAFKA_SNAPSHOT;
//        String sinkDDL = CustomTable.SNAPSHOT_PRINT;
        //注册source和sink
        tableEnv.executeSql(sourceDDL);
//        tableEnv.executeSql(sinkDDL);

        Table sourceTable = tableEnv.from("snapshot");

        Table timeTable = tableEnv.sqlQuery("select \n" +
                "TUMBLE_START(ts, INTERVAL '15' SECOND), \n" +
                " hs_security_id,\n" +
                " security_id,\n" +
                " MAX(pre_close_px) as pre_close_px, \n" +
                " MAX(open_px) as open_px, \n" +
                " MAX(high_px) as high_px, \n" + 
                " FIRST_VALUE(phase_code) as phase_code, \n" +
                " FIRST_VALUE(bid_price) as bid_price, \n" +
                " FIRST_VALUE(bid_qty) as bid_qty, \n" +
                " FIRST_VALUE(offer_price) as offer_price, \n" +
                " FIRST_VALUE(offer_qty) as offer_qty \n" +
                " from " +
                sourceTable
                + " group by TUMBLE(ts, INTERVAL '15' SECOND),hs_security_id,security_id");

        TableResult tableResult = tableEnv.executeSql(" select *  from " + timeTable);
        tableResult.print();
        env.execute("快照数据读取");
    }
}

上一篇:用于文本内容的复制粘贴


下一篇:excel导入数据到sqlserver