分离的 SQL 查询
为定义端到端的 SQL 管道,SQL 的 INSERT INTO
语句可以向 Flink 集群提交长时间运行的分离查询。查询产生的结果输出到除 SQL 客户端外的扩展系统中。这样可以应对更高的并发和更多的数据。CLI 自身在提交后不对分离查询做任何控制。
INSERT INTO MyTableSink SELECT * FROM MyTableSource
sink MyTableSink
必须在环境配置文件中声明。查看更多关于 Flink 支持的外部系统及其配置信息,参见 connection page。如下展示 Apache Kafka 的 sink 示例。
tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3)
SQL 客户端要确保语句成功提交到集群上。一旦提交查询,CLI 将展示关于 Flink 作业的相关信息。
[INFO] Table update statement has been successfully submitted to the cluster: Cluster ID: StandaloneClusterId Job ID: 6f922fe5cba87406ff23ae4a7bb79044 Web interface: http://localhost:8081
注意 提交后,SQL 客户端不追踪正在运行的 Flink 作业状态。提交后可以关闭 CLI 进程,并且不会影响分离的查询。Flink 的重启策略负责容错。取消查询可以用 Flink 的 web 接口、命令行或 REST API 。
SQL 视图
视图是一张虚拟表,允许通过 SQL 查询来定义。视图的定义会被立即解析与验证。然而,提交常规 INSERT INTO
或 SELECT
语句后不会立即执行,在访问视图时才会真正执行。
视图可以用环境配置文件或者 CLI 会话来定义。
下例展示如何在一个文件里定义多张视图。视图注册的顺序和定义它们的环境配置文件一致。支持诸如 视图 A 依赖视图 B ,视图 B 依赖视图 C 的引用链。
tables: - name: MyTableSource # ... - name: MyRestrictedView type: view query: "SELECT MyField2 FROM MyTableSource" - name: MyComplexView type: view query: > SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR) FROM MyTableSource WHERE MyField2 > 200
相较于 table soruce 和 sink,会话环境配置文件中定义的视图具有最高优先级。
视图还可以在 CLI 会话中用 CREATE VIEW
语句来创建:
CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;
视图能在 CLI 会话中创建,也能用 DROP VIEW
语句删除:
DROP VIEW MyNewView;
注意 CLI 中视图的定义仅限于上述语法。将来版本会支持定义视图结构以及在表名中加入转义的空格。
临时表(Temporal Table)
临时表是在变化的历史记录表上的(参数化)视图,该视图在某个特定时间点返回表的内容。这对于在特定的时间戳将一张表的内容联结另一张表是非常有用的。更多信息见联结临时表页面。
下例展示如何定义一张临时表 SourceTemporalTable
:
tables: # 定义包含对临时表的更新的 table source (或视图) - name: HistorySource type: source-table update-mode: append connector: # ... format: # ... schema: - name: integerField data-type: INT - name: stringField data-type: STRING - name: rowtimeField data-type: TIMESTAMP(3) rowtime: timestamps: type: from-field from: rowtimeField watermarks: type: from-source # 在具有时间属性和主键的变化历史记录表上定义临时表 - name: SourceTemporalTable type: temporal-table history-table: HistorySource primary-key: integerField time-attribute: rowtimeField # could also be a proctime field
如例子中所示,table source,视图和临时表的定义可以相互混合。它们按照在环境配置文件中定义的顺序进行注册。例如,临时表可以引用一个视图,该视图依赖于另一个视图或 table source。
Flink实战(七十七):flink-sql使用(五)分离的 SQL 查询、SQL 视图、临时表(Temporal Table)、