Flink基础(二十五):FLINK SQL(一)查询语句(一)基本查询

来源:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html

0 简介

SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定。这个方法会以 Table 的形式返回 SELECT (或 VALUE)的查询结果。Table 可以被用于 随后的SQL 与 Table API 查询 、 转换为 DataSet 或 DataStream 或 输出到 TableSink 。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。

为了可以在 SQL 查询中访问到表,你需要先 在 TableEnvironment 中注册表 。表可以通过 TableSource、 TableCREATE TABLE 语句、 DataStream 或 DataSet 注册。 用户也可以通过 向 TableEnvironment 中注册 catalog 的方式指定数据源的位置。

为方便起见 Table.toString() 将会在其 TableEnvironment 中自动使用一个唯一的名字注册表并返回表名。 因此, Table 对象可以如下文所示样例,直接内联到 SQL 语句中。

注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException。批处理和流处理所支持的 SQL 特性将会在下述章节中列出。

1 指定查询

以下示例显示如何在已注册和内联表上指定 SQL 查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

//  从外部数据源读取 DataStream 
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)

// 使用 SQL 查询内联的(未注册的)表
val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
val result = tableEnv.sqlQuery(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE ‘%Rubber%‘")

// 使用名称 "Orders" 注册一个 DataStream 
tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
// 在表上执行 SQL 查询并得到以新表返回的结果
val result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%‘")

// 创建并注册一个 TableSink
val schema = new Schema()
    .field("product", DataTypes.STRING())
    .field("amount", DataTypes.INT())

tableEnv.connect(new FileSystem().path("/path/to/file"))
    .withFormat(...)
    .withSchema(schema)
    .createTemporaryTable("RubberOrders")

// 在表上执行插入操作,并把结果发出到 TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%‘")

2 执行查询

SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql() 方法来执行,将选择的结果收集到本地。该方法返回 TableResult 对象用于包装查询的结果。和 SELECT 语句很像,一个 Table 对象可以通过 Table.execute() 方法执行从而将 Table 的内容收集到本地客户端。 TableResult.collect() 方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过 CloseableIterator#close() 方法主动地关闭作业以防止资源泄露。 我们还可以通过 TableResult.print() 方法将查询结果打印到本地控制台。TableResult 中的结果数据只能被访问一次,因此一个 TableResult 实例中,collect() 方法和 print() 方法不能被同时使用。

对于流模式,TableResult.collect() 方法或者 TableResult.print 方法保证端到端精确一次的数据交付。这就要求开启 checkpointing。默认情况下 checkpointing 是禁止的,我们可以通过 TableConfig 设置 checkpointing 相关属性(请参考 checkpointing 配置)来开启 checkpointing。 因此一条结果数据只有在其对应的 checkpointing 完成后才能在客户端被访问。

注意: 对于流模式,当前仅支持追加模式的查询语句,并且应该开启 checkpoint。因为一条结果只有在其对应的 checkpoint 完成之后才能被客户端访问到。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env, settings)
// enable checkpointing
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))

tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")

// execute SELECT statement
val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
val it = tableResult1.collect()
try while (it.hasNext) {
  val row = it.next
  // handle row
}
finally it.close() // close the iterator to avoid resource leak

// execute Table
val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
tableResult2.print()

3 语法

Flink 通过支持标准 ANSI SQL的 Apache Calcite 解析 SQL。

以下 BNF-语法 描述了批处理和流处理查询中所支持的 SQL 特性的超集。其中 操作符 章节展示了所支持的特性的样例,并指明了哪些特性仅适用于批处理或流处理。

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING ( column [, column ]* )

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ ‘(‘ columnAlias [, columnAlias ]* ) ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ]
  | LATERAL TABLE ( functionName ( expression [, expression ]* ) )
  | UNNEST ( expression )

dynamicTableOptions:
  /*+ OPTIONS(key=val [, key=val]*) */

key:
  stringLiteral

val:
  stringLiteral

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | ( )
  | ( expression [, expression ]* )
  | CUBE ( expression [, expression ]* )
  | ROLLUP ( expression [, expression ]* )
  | GROUPING SETS ( groupItem [, groupItem ]* )

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    (
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    )

matchRecognize:
      MATCH_RECOGNIZE (
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN ( pattern )
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      )

measureColumn:
      expression AS alias

pattern:
      patternTerm [ ‘|‘ patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      *
  |   *?
  |   +
  |   +?
  |   ?
  |   ??
  |   { { [ minRepeat ], [ maxRepeat ] } } [‘?‘]
  |   { repeat }

Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词法约定:

  • 不管是否引用标识符,都保留标识符的大小写。
  • 且标识符需区分大小写。
  • 与 Java 不一样的地方在于,通过反引号,可以允许标识符带有非字母的字符(如:"SELECT a AS `my field` FROM t")。

字符串文本常量需要被单引号包起来(如 SELECT ‘Hello World‘ )。两个单引号表示转移(如 SELECT ‘It‘‘s me.‘)。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:

  • 使用反斜杠(\)作为转义字符(默认):SELECT U&‘\263A‘
  • 使用自定义的转义字符: SELECT U&‘#263A‘ UESCAPE ‘#‘

 

Flink基础(二十五):FLINK SQL(一)查询语句(一)基本查询

上一篇:数据库连接池(使用c3p0)


下一篇:mysql多实例