1. 背景
昨天《FlinkSQL实践记录1》对FlinkSql做了简单的使用insert into .. select ..
,今天对聚合运算做一些实践。
2. 代码实践
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'kafka'," +
" 'password' = 'Bonc@123'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入数据
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT name, count(*) as cnt " +
"FROM sensor_source " +
"where id > 3 " +
"group by name "
// "order by name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
2.1 mysql表不加primary主键
# 注意需要使用bigint, int类型会报错
create table count_info (
name varchar(100),
cnt bigint ) ;
当上游数据不断产生时,会将实时产生的新结果插入mysql
2.2 mysql表添加primary主键
create table count_info (
name varchar(100),
cnt bigint,
primary key(NAME)
) ;
当上游数据不断产生时,会将实时产生的新结果更新至mysql
新生产一批数据后
3. 遇到的问题及解决办法
3.1 sink table缺失主键
Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.
解决办法: mysql_sink增加PRIMARY KEY (name) NOT ENFORCED
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'kafka'," +
" 'password' = 'Bonc@123'" +
")";
3.2 不能排序
Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
解决办法:去掉order by
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT name, count(*) as cnt " +
"FROM sensor_source " +
"where id > 3 " +
"group by name "
// "order by name "
);
4. 不过瘾
接下来对join关联做些实践