1.背景
Flink目前在国内发展的火热,笔者在2018首次接触了flink之后,总是在官网/公众号各个地方追踪它的新动态,但一直没机会在生产上使用,近期有流式计算的需求,且目前企业对计算的实时性也要求越来越高,今天先在本地环境测试一把。测试把kafka中数据通过flink处理后写入mysql。
环境: java8 , scala2.12
版本: flink1.13.0
maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--kafka connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- kafka中数据已json格式存储,解析需要flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--JDBC connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
</dependency>
2. 代码
2.1 kafka生产者
for (int i = 1; i < 10; i++) {
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name"+i);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"flinksqldemo",
i,
json.toJSONString()
);
// 发送消息
producer.send(record);
2.2 flink处理
// 创建执行环境
//EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic映射成一个输入临时表
tableEnv.executeSql(
"CREATE TABLE source1(id STRING, name STRING) WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinkdemo'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'flinkGroup'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json'," +
" 'json.fail-on-missing-field' = 'false'," +
" 'json.ignore-parse-errors' = 'true')"
);
// test_info表需在mysql中提前建好
// 把Mysql中的表映射为一个输出临时表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" id string," +
" name string " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false'," +
" 'table-name' = 'test_info'," +
" 'username' = 'test'," +
" 'password' = 'pwd'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入数据
tableEnv.executeSql("INSERT INTO mysql_sink SELECT id, name FROM source1");
mysql查询表,可观察到在实时的插入数据。
3. 遇到的问题及解决办法
3.1 sql解析异常
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "<EOF>" at line 1, column 263.
Was expecting one of:
"UESCAPE" ...
<QUOTED_STRING> ...
")" ...
"," ...
原因是以下代码中,WITH ( xxx )
,少了右括号
// 把kafka中的topic映射成一个输入临时表
tableEnv.executeSql(
"CREATE TABLE sensor_source(id STRING, name STRING) WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinkdemo'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'flinkGroup'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json'"
);
3.2 json格式问题
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
缺少flink-json依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
3.3 print输出问题
Exception in thread "main" org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:775)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
at org.apache.FlinkSqlDemo.main(FlinkSqlDemo.java:71)
Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
缺少依赖flink-client
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
3.4 sink到mysql 程序中断,mysql表无数据,无报错
Process finished with exit code 0
缺少jdbc jar包
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
</dependency>
4. 拓展
# 输出到控制台
String print_sql = "CREATE TABLE print_sink (" +
"id STRING," +
"name STRING" +
") WITH (" +
" 'connector' = 'print'" +
")";
tableEnv.executeSql(print_sql );
tableEnv.executeSql("INSERT INTO print_sink SELECT * FROM sensor_source");
# sink到kafka另一个主题
String kafka_sink_sql =
"create table kafka_sink (id string, name string) with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'test_info_2'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")";
tableEnv.executeSql(kafka_sink_sql);
tableEnv.executeSql("insert into kafka_sink select * from sensor_source");