来源:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html
0 简介
SELECT 语句和 VALUES 语句需要使用 TableEnvironment
的 sqlQuery()
方法加以指定。这个方法会以 Table
的形式返回 SELECT (或 VALUE)的查询结果。Table
可以被用于 随后的SQL 与 Table API 查询 、 转换为 DataSet 或 DataStream 或 输出到 TableSink 。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。
为了可以在 SQL 查询中访问到表,你需要先 在 TableEnvironment 中注册表 。表可以通过 TableSource、 Table、CREATE TABLE 语句、 DataStream 或 DataSet 注册。 用户也可以通过 向 TableEnvironment 中注册 catalog 的方式指定数据源的位置。
为方便起见 Table.toString()
将会在其 TableEnvironment
中自动使用一个唯一的名字注册表并返回表名。 因此, Table
对象可以如下文所示样例,直接内联到 SQL 语句中。
注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException
。批处理和流处理所支持的 SQL 特性将会在下述章节中列出。
1 指定查询
以下示例显示如何在已注册和内联表上指定 SQL 查询。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // 从外部数据源读取 DataStream val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // 使用 SQL 查询内联的(未注册的)表 val table = ds.toTable(tableEnv, $"user", $"product", $"amount") val result = tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE ‘%Rubber%‘") // 使用名称 "Orders" 注册一个 DataStream tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount") // 在表上执行 SQL 查询并得到以新表返回的结果 val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%‘") // 创建并注册一个 TableSink val schema = new Schema() .field("product", DataTypes.STRING()) .field("amount", DataTypes.INT()) tableEnv.connect(new FileSystem().path("/path/to/file")) .withFormat(...) .withSchema(schema) .createTemporaryTable("RubberOrders") // 在表上执行插入操作,并把结果发出到 TableSink tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%‘")
2 执行查询
SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql()
方法来执行,将选择的结果收集到本地。该方法返回 TableResult
对象用于包装查询的结果。和 SELECT 语句很像,一个 Table
对象可以通过 Table.execute()
方法执行从而将 Table
的内容收集到本地客户端。 TableResult.collect()
方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过 CloseableIterator#close()
方法主动地关闭作业以防止资源泄露。 我们还可以通过 TableResult.print()
方法将查询结果打印到本地控制台。TableResult
中的结果数据只能被访问一次,因此一个 TableResult
实例中,collect()
方法和 print()
方法不能被同时使用。
对于流模式,TableResult.collect()
方法或者 TableResult.print
方法保证端到端精确一次的数据交付。这就要求开启 checkpointing。默认情况下 checkpointing 是禁止的,我们可以通过 TableConfig
设置 checkpointing 相关属性(请参考 checkpointing 配置)来开启 checkpointing。 因此一条结果数据只有在其对应的 checkpointing 完成后才能在客户端被访问。
注意: 对于流模式,当前仅支持追加模式的查询语句,并且应该开启 checkpoint。因为一条结果只有在其对应的 checkpoint 完成之后才能被客户端访问到。
val env = StreamExecutionEnvironment.getExecutionEnvironment() val tableEnv = StreamTableEnvironment.create(env, settings) // enable checkpointing tableEnv.getConfig.getConfiguration.set( ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10)) tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") // execute SELECT statement val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders") val it = tableResult1.collect() try while (it.hasNext) { val row = it.next // handle row } finally it.close() // close the iterator to avoid resource leak // execute Table val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute() tableResult2.print()
3 语法
Flink 通过支持标准 ANSI SQL的 Apache Calcite 解析 SQL。
以下 BNF-语法 描述了批处理和流处理查询中所支持的 SQL 特性的超集。其中 操作符 章节展示了所支持的特性的样例,并指明了哪些特性仅适用于批处理或流处理。
query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING ‘(‘ column [, column ]* ‘)‘ tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ ‘(‘ columnAlias [, columnAlias ]* ‘)‘ ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ] | LATERAL TABLE ‘(‘ functionName ‘(‘ expression [, expression ]* ‘)‘ ‘)‘ | UNNEST ‘(‘ expression ‘)‘ dynamicTableOptions: /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral values: VALUES expression [, expression ]* groupItem: expression | ‘(‘ ‘)‘ | ‘(‘ expression [, expression ]* ‘)‘ | CUBE ‘(‘ expression [, expression ]* ‘)‘ | ROLLUP ‘(‘ expression [, expression ]* ‘)‘ | GROUPING SETS ‘(‘ groupItem [, groupItem ]* ‘)‘ windowRef: windowName | windowSpec windowSpec: [ windowName ] ‘(‘ [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ‘)‘ matchRecognize: MATCH_RECOGNIZE ‘(‘ [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN ‘(‘ pattern ‘)‘ [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ‘)‘ measureColumn: expression AS alias pattern: patternTerm [ ‘|‘ patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: ‘*‘ | ‘*?‘ | ‘+‘ | ‘+?‘ | ‘?‘ | ‘??‘ | ‘{‘ { [ minRepeat ], [ maxRepeat ] } ‘}‘ [‘?‘] | ‘{‘ repeat ‘}‘
Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词法约定:
- 不管是否引用标识符,都保留标识符的大小写。
- 且标识符需区分大小写。
- 与 Java 不一样的地方在于,通过反引号,可以允许标识符带有非字母的字符(如:
"SELECT a AS `my field` FROM t"
)。
字符串文本常量需要被单引号包起来(如 SELECT ‘Hello World‘
)。两个单引号表示转移(如 SELECT ‘It‘‘s me.‘
)。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:
- 使用反斜杠(
\
)作为转义字符(默认):SELECT U&‘\263A‘
- 使用自定义的转义字符:
SELECT U&‘#263A‘ UESCAPE ‘#‘