Flink实战(七十七):flink-sql使用(五)分离的 SQL 查询、SQL 视图、临时表(Temporal Table)、

分离的 SQL 查询

为定义端到端的 SQL 管道,SQL 的 INSERT INTO 语句可以向 Flink 集群提交长时间运行的分离查询。查询产生的结果输出到除 SQL 客户端外的扩展系统中。这样可以应对更高的并发和更多的数据。CLI 自身在提交后不对分离查询做任何控制。

INSERT INTO MyTableSink SELECT * FROM MyTableSource

sink MyTableSink 必须在环境配置文件中声明。查看更多关于 Flink 支持的外部系统及其配置信息,参见 connection page。如下展示 Apache Kafka 的 sink 示例。

tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

SQL 客户端要确保语句成功提交到集群上。一旦提交查询,CLI 将展示关于 Flink 作业的相关信息。

[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 6f922fe5cba87406ff23ae4a7bb79044
Web interface: http://localhost:8081

注意 提交后,SQL 客户端不追踪正在运行的 Flink 作业状态。提交后可以关闭 CLI 进程,并且不会影响分离的查询。Flink 的重启策略负责容错。取消查询可以用 Flink 的 web 接口、命令行或 REST API 。

SQL 视图

视图是一张虚拟表,允许通过 SQL 查询来定义。视图的定义会被立即解析与验证。然而,提交常规 INSERT INTO 或 SELECT 语句后不会立即执行,在访问视图时才会真正执行。

视图可以用环境配置文件或者 CLI 会话来定义。

下例展示如何在一个文件里定义多张视图。视图注册的顺序和定义它们的环境配置文件一致。支持诸如 视图 A 依赖视图 B ,视图 B 依赖视图 C 的引用链。

tables:
  - name: MyTableSource
    # ...
  - name: MyRestrictedView
    type: view
    query: "SELECT MyField2 FROM MyTableSource"
  - name: MyComplexView
    type: view
    query: >
      SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
      FROM MyTableSource
      WHERE MyField2 > 200

相较于 table soruce 和 sink,会话环境配置文件中定义的视图具有最高优先级。

视图还可以在 CLI 会话中用 CREATE VIEW 语句来创建:

CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;

视图能在 CLI 会话中创建,也能用 DROP VIEW 语句删除:

DROP VIEW MyNewView;

注意 CLI 中视图的定义仅限于上述语法。将来版本会支持定义视图结构以及在表名中加入转义的空格。

临时表(Temporal Table)

临时表是在变化的历史记录表上的(参数化)视图,该视图在某个特定时间点返回表的内容。这对于在特定的时间戳将一张表的内容联结另一张表是非常有用的。更多信息见联结临时表页面。

下例展示如何定义一张临时表 SourceTemporalTable

tables:

  # 定义包含对临时表的更新的 table source (或视图)

  - name: HistorySource
    type: source-table
    update-mode: append
    connector: # ...
    format: # ...
    schema:
      - name: integerField
        data-type: INT
      - name: stringField
        data-type: STRING
      - name: rowtimeField
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: from-field
            from: rowtimeField
          watermarks:
            type: from-source

  # 在具有时间属性和主键的变化历史记录表上定义临时表
  - name: SourceTemporalTable
    type: temporal-table
    history-table: HistorySource
    primary-key: integerField
    time-attribute: rowtimeField  # could also be a proctime field

如例子中所示,table source,视图和临时表的定义可以相互混合。它们按照在环境配置文件中定义的顺序进行注册。例如,临时表可以引用一个视图,该视图依赖于另一个视图或 table source。

 

Flink实战(七十七):flink-sql使用(五)分离的 SQL 查询、SQL 视图、临时表(Temporal Table)、

上一篇:PostgreSQL中index only scan并不总是仅扫描索引


下一篇:3.数据库权限管理和远程登录