基本概念
从传统数据库系统的角度来看,Table对象与VIEW视图非常像
标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。
Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。
数据库表是 INSERT、UPDATE 和 DELETE DML 语句的 stream 的结果,通常称为changelog流
物化视图是流式SQL查询的结果。为了更新视图,查询不断地更新视图的changelog流
连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。
SQL查询不支持部分数据类型(cast 表达式或字符常量值)
如:STRING, BYTES,TIME(p) WITHOUT TIME ZONE, TIME(p) WITH LOCAL TIME ZONE, TIMESTAMP(p) WITHOUT TIME ZONE, TIMESTAMP(p) WITH LOCAL TIME ZONE,
ARRAY, MULTISET, ROW,RAW
关键字在使用时使用反引号将该字段名包起来(如 `value`, `count` )
WATERMARK 定义了表的事件时间属性,其形式为:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 。
语法相关
普通select完整语法
CEP完整语法:
每个 MATCH_RECOGNIZE 查询都包含以下子句:
l PARTITION BY - 定义表的逻辑分区;类似于 GROUP BY 操作。
l ORDER BY - 指定传入行的排序方式;这是必须的,因为模式依赖于顺序。
l MEASURES - 定义输出包含哪些内容;类似于 SELECT 子句。
l ONE ROW PER MATCH - 输出方式,定义每个匹配项应产生多少行。
l AFTER MATCH SKIP - 指定下一个匹配的开始位置;这也是控制单个事件可以属于多少个不同匹配项的方法。
l PATTERN - 允许使用类似于 正则表达式 的语法构造搜索的模式。
l DEFINE - 定义模式变量必须满足的条件。如果没有为模式变量定义条件,将对每一行使用计算结果为 true
的默认条件
CEP例子:
insert into rds_out select `start_timestamp`, `end_timestamp`, card_id, `event` from datahub_stream MATCH_RECOGNIZE ( PARTITION BY card_id --按card_id分区,将相同卡号的数据分发到同一个计算节点。 ORDER BY `timestamp` --在窗口内,对事件时间进行排序。 MEASURES --定义如何根据匹配成功的输入事件构造输出事件。 e2.`action` as `event`, e1.`timestamp` as `start_timestamp`, --第一次的事件时间为start_timestamp。 LAST(e2.`timestamp`) as `end_timestamp` --最新的事件时间为end_timestamp。 ONE ROW PER MATCH --匹配成功输出一条。 AFTER MATCH SKIP TO NEXT ROW --匹配后跳转到下一行。 PATTERN (e1 e2+) WITHIN INTERVAL '10' MINUTE --定义两个事件,e1和e2。 DEFINE --定义在PATTERN中出现的patternVariable的具体含义。 e1 as e1.action = 'Consumption', --事件一的action标记为Consumption。 e2 as e2.action = 'Consumption' and e2.location <> e1.location --事件二的action标记为Consumption,且事件一和事件二的location不一致。 );
注意点
- 强烈建议对传入的数据进行分区,否则
MATCH_RECOGNIZE
子句将被转换为非并行算子,确保全局排序 -
MATCH_RECOGNIZE
子句假定升序的 时间属性 是ORDER BY
子句的第一个参数。 - Aggregation 可以应用于表达式,但前提是它们引用单个模式变量。因此,
SUM(A.price * A.tax)
是有效的,而AVG(A.price * B.tax)
则是无效的。 - 不支持
DISTINCT
aggregation。 - 模式的最后一个变量不能使用贪婪量词。因此,不允许使用类似
(A B*)
的模式。通过引入条件为B
的人工状态(例如C
),可以轻松解决此问题。因此,你可以使用类似以下的查询:
PATTERN (A B* C)
DEFINE
A AS condA(),
B AS condB(),
C AS NOT condB()
- 目前不支持可选的勉强量词(
A??
或者A{0,1}?
) - 通常鼓励使用 WITHIN 子句,因为它有助于 Flink 进行有效的内存管理。一旦达到阈值,即可更新基础状态。
Flink cep不支持的语法
取自官网