FlinkSQL实践记录

1.背景

Flink目前在国内发展的火热,笔者在2018首次接触了flink之后,总是在官网/公众号各个地方追踪它的新动态,但一直没机会在生产上使用,近期有流式计算的需求,且目前企业对计算的实时性也要求越来越高,今天先在本地环境测试一把。测试把kafka中数据通过flink处理后写入mysql。
环境: java8 , scala2.12
版本: flink1.13.0
maven依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--kafka connector-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- kafka中数据已json格式存储,解析需要flink-json -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--JDBC connector-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.31</version>
        </dependency>

2. 代码

2.1 kafka生产者

        for (int i = 1; i < 10; i++) {
            JSONObject json = new JSONObject();
            json.put("id", i+"");
            json.put("name", "name"+i);
            ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                    "flinksqldemo",
                    i,
                    json.toJSONString()
            );
            // 发送消息
            producer.send(record); 

2.2 flink处理

        // 创建执行环境
        //EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 把kafka中的topic映射成一个输入临时表
        tableEnv.executeSql(
                "CREATE TABLE source1(id STRING, name STRING) WITH  (" +
                        " 'connector' = 'kafka'," +
                        " 'topic' = 'flinkdemo'," +
                        " 'properties.bootstrap.servers' = 'localhost:9092'," +
                        " 'properties.group.id' = 'flinkGroup'," +
                        " 'scan.startup.mode' = 'earliest-offset'," +
                        " 'format' = 'json'," +
                        " 'json.fail-on-missing-field' = 'false'," +
                        " 'json.ignore-parse-errors' = 'true')"
        );

        // test_info表需在mysql中提前建好
        // 把Mysql中的表映射为一个输出临时表
        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               id string," +
                "               name string " +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false'," +
                " 'table-name' = 'test_info'," +
                " 'username' = 'test'," +
                " 'password' = 'pwd'" +
                ")";

        tableEnv.executeSql(mysql_sql);

        // 插入数据
        tableEnv.executeSql("INSERT INTO mysql_sink SELECT id, name FROM source1");

mysql查询表,可观察到在实时的插入数据。

3. 遇到的问题及解决办法

3.1 sql解析异常

Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "<EOF>" at line 1, column 263.
Was expecting one of:
    "UESCAPE" ...
    <QUOTED_STRING> ...
    ")" ...
    "," ...

原因是以下代码中,WITH ( xxx ),少了右括号

        // 把kafka中的topic映射成一个输入临时表
        tableEnv.executeSql(
                "CREATE TABLE sensor_source(id STRING, name STRING) WITH  (" +
                        " 'connector' = 'kafka'," +
                        " 'topic' = 'flinkdemo'," +
                        " 'properties.bootstrap.servers' = 'localhost:9092'," +
                        " 'properties.group.id' = 'flinkGroup'," +
                        " 'scan.startup.mode' = 'earliest-offset'," +
                        " 'format' = 'json'"
        );

3.2 json格式问题

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.

缺少flink-json依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

3.3 print输出问题

Exception in thread "main" org.apache.flink.table.api.TableException: Failed to execute sql
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:775)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
	at org.apache.FlinkSqlDemo.main(FlinkSqlDemo.java:71)
Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

缺少依赖flink-client

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

3.4 sink到mysql 程序中断,mysql表无数据,无报错


Process finished with exit code 0

缺少jdbc jar包

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.31</version>
        </dependency>

4. 拓展

# 输出到控制台
    String print_sql = "CREATE TABLE print_sink (" +
                "id STRING," +
                "name STRING" +
                ") WITH (" +
                " 'connector' = 'print'" +
                ")";
    tableEnv.executeSql(print_sql );
    tableEnv.executeSql("INSERT INTO print_sink SELECT * FROM sensor_source");

# sink到kafka另一个主题
    String kafka_sink_sql =
        "create table kafka_sink (id string, name string) with (" +
                " 'connector' = 'kafka'," +
                " 'topic' = 'test_info_2'," +
                " 'properties.bootstrap.servers' = 'localhost:9092'," +
                " 'format' = 'json'" +
                ")";
    tableEnv.executeSql(kafka_sink_sql);
    tableEnv.executeSql("insert into kafka_sink select * from sensor_source");

上一篇:全站最硬核 百万字强肝RocketMq源码 火热更新中~(三十五)


下一篇:Apache HTTP Server 处理连接的方式不能扩展,无法满足互联网不断发展的需求