FlinkSQL实践记录2

1. 背景

昨天《FlinkSQL实践记录1》对FlinkSql做了简单的使用insert into .. select ..,今天对聚合运算做一些实践。

2. 代码实践

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

        tableEnv.executeSql(mysql_sql);

        // 插入数据
        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                       // "order by name "
        );
        System.out.println(tableResult.getJobClient().get().getJobStatus());

2.1 mysql表不加primary主键

# 注意需要使用bigint, int类型会报错
create table count_info (
name varchar(100),
cnt bigint ) ;

当上游数据不断产生时,会将实时产生的新结果插入mysql
FlinkSQL实践记录2

2.2 mysql表添加primary主键

create table count_info (
name varchar(100),
cnt bigint,
primary key(NAME)
) ;

当上游数据不断产生时,会将实时产生的新结果更新至mysql
FlinkSQL实践记录2
新生产一批数据后
FlinkSQL实践记录2

3. 遇到的问题及解决办法

3.1 sink table缺失主键

Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

解决办法: mysql_sink增加PRIMARY KEY (name) NOT ENFORCED

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

3.2 不能排序

Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.

解决办法:去掉order by

        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                      //  "order by name "
        );

4. 不过瘾

接下来对join关联做些实践

上一篇: FreeSwitch 对接 RTSP 和 RTMP视频


下一篇:FreeSWITCH权威指南 .pdf