解决 Flink 1.11.0 sql 不能指定 jobName 的问题

月初的时候,Flink 终于发布 1.11.0 版本, CDC 的功能还是比较期待的(虽然比预期差很多)

当然是升级一波了

最新的代码已经上传到 GitHub : https://github.com/springMoon/sqlSubmit

跑 sqlSubmit 的代码,随便来个 kafka to kafka 的sql 

在执行这句的时候:

env.execute(Common.jobName)

报了这个错:

Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
  at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
  at com.rookie.submit.main.SqlSubmitBak$.main(SqlSubmitBak.scala:68)
  at com.rookie.submit.main.SqlSubmitBak.main(SqlSubmitBak.scala)

 报错了,但是任务还是跑起来了,这个 任务名是什么鬼,WTF?

解决 Flink 1.11.0 sql 不能指定 jobName 的问题

 

 

 

这种时候,当然是 debug 下代码,到底怎么了

debug  tabEnv.executeSql(sql)  ,执行到:

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql() 方法: 

解决 Flink 1.11.0 sql 不能指定 jobName 的问题

 

 parser.parse(statement) 就是解析sql 成 算子了,这个我不懂,就跳过了

直接看下面一句,执行

executeOperation(operations.get(0))

调到方法,代码比较长就不贴了,各位同学请自行查看源码:

private TableResult executeOperation(Operation operation) 

这是根据算子的操作类型,选择对于的语句执行, insert 语句属于 ModifyOperation 所以执行最前面的一个分支

解决 Flink 1.11.0 sql 不能指定 jobName 的问题

 

 进去,找到这个方法:

@Override
    public TableResult executeInternal(List<ModifyOperation> operations) {
        List<Transformation<?>> transformations = translate(operations);
        List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);
        String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);
        Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
        try {
            JobClient jobClient = execEnv.executeAsync(pipeline);
            TableSchema.Builder builder = TableSchema.builder();
            Object[] affectedRowCounts = new Long[operations.size()];
            for (int i = 0; i < operations.size(); ++i) {
                // use sink identifier name as field name
                builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT());
                affectedRowCounts[i] = -1L;
            }

            return TableResultImpl.builder()
                    .jobClient(jobClient)
                    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                    .tableSchema(builder.build())
                    .data(Collections.singletonList(Row.of(affectedRowCounts)))
                    .build();
        } catch (Exception e) {
            throw new TableException("Failed to execute sql", e);
        }
    }

可以看到,在执行到这里的时候,直接指定了jobName ,提交任务了,吐血

String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);

经过大佬指点后,去 JIRA 提了个bug:https://issues.apache.org/jira/browse/FLINK-18545

那个时候,刚好在说快速发布FLink 1.11.1 修复一些比较严重的bug ,本来以为可以赶上这趟车的,没想到讨论了比较长时间,赶不上了。

讨论中,有个大佬提到  executeSql 可以执行很多种类的sql 比如 DDL,DML,如果是给一个 DDL 语句指定jobName 比较奇怪,所以建议我用   org.apache.flink.table.api.StatementSet,并在 StatementSet 中添加可以指定 jobName 的 execute 方法。

/**
     * add insert statement to the set.
     */
    StatementSet addInsertSql(String statement);
/**
     * execute all statements and Tables as a batch.
     *
     * <p>The added statements and Tables will be cleared when executing this method.
     */
    TableResult execute();

这个方法我是可以接受的,所以就直接改了。

改动设计如下几个类:

接口:org.apache.flink.table.api.StatementSet
实现类:org.apache.flink.table.api.internal.StatementSetImpl
接口:org.apache.flink.table.api.internal.TableEnvironmentInternal
实现类:org.apache.flink.table.api.internal.TableEnvironmentImpl

就是复制之前的代码,给 execute 加个 参数

接口:org.apache.flink.table.api.StatementSet
/**
     * execute all statements and Tables as a batch.
     *
     * <p>The added statements and Tables will be cleared when executing this method.
     */
    TableResult execute();

    /**
     * execute all statements and Tables as a batch.
     *
     * <p>The added statements and Tables will be cleared when executing this method.
     */
    TableResult execute(String jobName);
实现类:org.apache.flink.table.api.internal.StatementSetImpl
@Override
    public TableResult execute() {
        try {
            return tableEnvironment.executeInternal(operations);
        } finally {
            operations.clear();
        }
    }

    @Override
    public TableResult execute(String jobName) {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        try {
            return tableEnvironment.executeInternal(operations, jobName);
        } finally {
            operations.clear();
        }
    }
接口:org.apache.flink.table.api.internal.TableEnvironmentInternal
/**
     * Execute the given modify operations and return the execution result.
     *
     * @param operations The operations to be executed.
     * @return the affected row counts (-1 means unknown).
     */
    TableResult executeInternal(List<ModifyOperation> operations);

    /**
     * Execute the given modify operations and return the execution result.
     *
     * @param operations The operations to be executed.
     * @param jobName The jobName
     * @return the affected row counts (-1 means unknown).
     */
    TableResult executeInternal(List<ModifyOperation> operations, String jobName);
实现类:org.apache.flink.table.api.internal.TableEnvironmentImpl

执行的实现中,将传入的参数,替换默认的jobName

@Override
    public TableResult executeInternal(List<ModifyOperation> operations) {
        return executeInternal(operations, null);
    }

    @Override
    public TableResult executeInternal(List<ModifyOperation> operations, String jobName) {
        List<Transformation<?>> transformations = translate(operations);
        List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);
        if (jobName == null) {
            jobName = "insert-into_" + String.join(",", sinkIdentifierNames);
        }
        Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
        try {
            JobClient jobClient = execEnv.executeAsync(pipeline);
            TableSchema.Builder builder = TableSchema.builder();
            Object[] affectedRowCounts = new Long[operations.size()];
            for (int i = 0; i < operations.size(); ++i) {
                // use sink identifier name as field name
                builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT());
                affectedRowCounts[i] = -1L;
            }

            return TableResultImpl.builder()
                    .jobClient(jobClient)
                    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                    .tableSchema(builder.build())
                    .data(Collections.singletonList(Row.of(affectedRowCounts)))
                    .build();
        } catch (Exception e) {
            throw new TableException("Failed to execute sql", e);
        }
    }

代码就改完了,执行个sql 看下

由于使用 StatementSet 的方式,提交,要把insert 的语句单独提出来,所以提交sql 的代码也处理了下:

// execute sql
    val statement = tabEnv.createStatementSet()
    var result: StatementSet = null
    for (sql <- sqlList) {
      try {
        if (sql.startsWith("insert")) {
          // ss
          result = statement.addInsertSql(sql)
        } else tabEnv.executeSql(sql)
        logger.info("execute success : " + sql)
      } catch {
        case e: Exception =>
          logger.error("execute sql error : " + sql, e)
          e.printStackTrace()
          System.exit(-1)
      }
    }
// 执行insert result.execute(Common.jobName)

跑个任务看下:

解决 Flink 1.11.0 sql 不能指定 jobName 的问题

 

 jobName 已经是指定的了。

 

上面把本地的代码改完了,但是还没修改flink 的 jar ,之前编译过Flink 1.11.0 的源码,还挺快的,改了这个代码,发现编译不了了,卡在 flink-runtion-web 这里,执行 npm install 的时候执行不动了

<executions>
    <execution>
        <id>install node and npm</id>
        <goals>
            <goal>install-node-and-npm</goal>
        </goals>
        <configuration>
            <nodeVersion>v10.9.0</nodeVersion>
        </configuration>
    </execution>
    <execution>
        <id>npm install</id>
        <goals>
            <goal>npm</goal>
        </goals>
        <configuration>
            <arguments>ci --cache-max=0 --no-save</arguments>
            <environmentVariables>
                <HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL>
            </environmentVariables>
        </configuration>
    </execution>
    <execution>
        <id>npm run build</id>
        <goals>
            <goal>npm</goal>
        </goals>
        <configuration>
            <arguments>run build</arguments>
        </configuration>
    </execution>
</executions>

所以,就直接把 jar 拉下来,用 压缩软件打开,直接把对于的class 文件换了。

本地需要修改 maven 仓库的 flink-table-api-java-1.11.0.jar  这个jar包

解决 Flink 1.11.0 sql 不能指定 jobName 的问题

 

 

部署的flink 由于 flink-table 的代码都打包到 flink-table_2.11-1.11.0.jar  中,所以需要替换这个包的对于class,就可以了。

 

社区的大佬有一篇博客,写了另一种更优雅的解决版本: https://www.jianshu.com/p/5981646cb1d4

 

搞定

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

解决 Flink 1.11.0 sql 不能指定 jobName 的问题

解决 Flink 1.11.0 sql 不能指定 jobName 的问题

上一篇:Mysql超百万数据的导入导出


下一篇:discuz 搭建论坛网站