很多场景中都会应用到模式匹配如:用户异常行为实时监测、银行卡异地监控、下单未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句进行复杂事件处理,我们先看个FlinkSQL中如何识别
- 在FlinkSQL client中创建一个测试表Ticket 其schema 如下
Ticket |-- symbol: String # 股票的代号 |-- price: Long # 股票的价格 |-- tax: Long # 股票应纳税额 |-- rowtime: TimeIndicatorTypeInfo(rowtime) # 更改这些值的时间点
Flink SQL> CREATE TABLE Ticket ( > symbol string, > price int, > tax int, > rowtime TIMESTAMP(3), > WATERMARK FOR rowtime AS rowtime --模式匹配必须要有水位线 > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = 'file:///mnt/ps/SAS/BigData/file/ticket.csv' > ); [INFO] Execute statement succeed. Flink SQL>
-
为了简化,我们只考虑单个股票
ACME
的传入数据。其中的行是连续追加的。查询数据如下
symbol price tax rowtime ACME 12 1 2021-09-01 09:00:00.000 ACME 17 2 2021-09-01 09:00:01.000 ACME 19 1 2021-09-01 09:00:02.000 ACME 21 3 2021-09-01 09:00:03.000 ACME 25 2 2021-09-01 09:00:04.000 ACME 18 1 2021-09-01 09:00:05.000 ACME 15 1 2021-09-01 09:00:06.000 ACME 14 2 2021-09-01 09:00:07.000 ACME 24 2 2021-09-01 09:00:08.000 ACME 25 2 2021-09-01 09:00:09.000 ACME 19 1 2021-09-01 09:00:10.000
- 现在的任务是找出一个单一股票价格不断下降的时期
SELECT * FROM Ticket MATCH_RECOGNIZE ( --只能用于追加表 PARTITION BY symbol --按symbol分组,相同数据会在一个节点进行计算 ORDER BY rowtime --同一组下按事件时间进行排序 MEASURES --定义输出 START_ROW.rowtime AS start_tstamp, LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, LAST(PRICE_UP.rowtime) AS end_tstamp ONE ROW PER MATCH --匹配成功输出一条 AFTER MATCH SKIP TO LAST PRICE_UP --从匹配成功的事件序列中最后一个对应价格上升的事件开始匹配下一次 PATTERN (START_ROW PRICE_DOWN+ PRICE_UP) --定义3个事件:开始行 价格下降 价格回升(+号代表一个或多个数据) DEFINE --定义事件的具体含义 PRICE_DOWN AS --上一条价格下降事件的价格为空并且下降事件的价格小于开始行的价格或者下降事件的价格小于上一条的价格 (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1), PRICE_UP AS --价格回升事件的价格大于上一条价格下降价格 PRICE_UP.price > LAST(PRICE_DOWN.price, 1) ) MR; --结果如下 从2021-09-01 09:00:04.000开始下降,直到2021-09-01 09:00:08.00回涨 symbol start_tstamp bottom_tstamp end_tstamp ACME 2021-09-01 09:00:04.000 2021-09-01 09:00:07.000 2021-09-01 09:00:08.000
- 离线SQL如何去实现呢
- 先生成示例数据源
with tb1 as ( select symbol, price, tax, rowtime from values('ACME',12,1,'2021-09-01 09:00:00'), ('ACME',17,2,'2021-09-01 09:00:01'), ('ACME',19,1,'2021-09-01 09:00:02'), ('ACME',21,3,'2021-09-01 09:00:03'), ('ACME',25,2,'2021-09-01 09:00:04'), ('ACME',18,1,'2021-09-01 09:00:05'), ('ACME',15,1,'2021-09-01 09:00:06'), ('ACME',14,2,'2021-09-01 09:00:07'), ('ACME',24,2,'2021-09-01 09:00:08'), ('ACME',25,2,'2021-09-01 09:00:09'), ('ACME',19,1,'2021-09-01 09:00:10') t(symbol,price,tax,rowtime) )
- 通过计算上一条和下一条的股票价格差判断是否连续下降
tb2 as ( select symbol, price, tax, rowtime, price-lag(price,1,price) over(partition by symbol order by rowtime) lag_price_diff, lead(price,1,price) over(partition by symbol order by rowtime)-price lead_price_diff, lead(rowtime,1,rowtime) over(partition by symbol order by rowtime) as lead_rowtime from tb1 ) --结果展示如下 symbol price tax rowtime lag_price_diff lead_price_diff ACME 12 1 2021-09-01 09:00:00 0 5 ACME 17 2 2021-09-01 09:00:01 5 2 ACME 19 1 2021-09-01 09:00:02 2 2 ACME 21 3 2021-09-01 09:00:03 2 4 ACME 25 2 2021-09-01 09:00:04 4 -7 ACME 18 1 2021-09-01 09:00:05 -7 -3 ACME 15 1 2021-09-01 09:00:06 -3 -1 ACME 14 2 2021-09-01 09:00:07 -1 10 ACME 24 2 2021-09-01 09:00:08 10 1 ACME 25 2 2021-09-01 09:00:09 1 -6 ACME 19 1 2021-09-01 09:00:10 -6 0
- 差值为负值即价格下降,根据此进行划分标签
tb3 as ( select symbol, price, tax, rowtime, lag_price_diff, lead_price_diff, lead_rowtime, sum(if(lag_price_diff>0,1,0)) over(partition by symbol order by rowtime) flag from tb2 where lag_price_diff<0 or lead_price_diff < 0 ) --结果如下 symbol price tax rowtime lag_price_diff lead_price_diff lead_rowtime flag ACME 25 2 2021-09-01 09:00:04 4 -7 2021-09-01 09:00:05 1 ACME 18 1 2021-09-01 09:00:05 -7 -3 2021-09-01 09:00:06 1 ACME 15 1 2021-09-01 09:00:06 -3 -1 2021-09-01 09:00:07 1 ACME 14 2 2021-09-01 09:00:07 -1 10 2021-09-01 09:00:08 1 ACME 25 2 2021-09-01 09:00:09 1 -6 2021-09-01 09:00:10 2 ACME 19 1 2021-09-01 09:00:10 -6 0 2021-09-01 09:00:10 2
- 标签flag为2的不符合连续下降,只有一条差值位置,不是连续下降,需要进行过滤
tb4 as ( select symbol, price, tax, rowtime, lead_rowtime, flag, sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct from tb3 ) --结果如下 symbol price tax rowtime lead_rowtime flag ct ACME 25 2 2021-09-01 09:00:04 2021-09-01 09:00:05 1 3 ACME 18 1 2021-09-01 09:00:05 2021-09-01 09:00:06 1 3 ACME 15 1 2021-09-01 09:00:06 2021-09-01 09:00:07 1 3 ACME 14 2 2021-09-01 09:00:07 2021-09-01 09:00:08 1 3 ACME 25 2 2021-09-01 09:00:09 2021-09-01 09:00:10 2 1 ACME 19 1 2021-09-01 09:00:10 2021-09-01 09:00:10 2 1
- 根据真实情况即连续的定义对数据进行过滤,统计结果
select symbol, --flag, min(rowtime) start_tstamp, max(rowtime) bottom_tstamp, max(lead_rowtime) end_tstamp from tb4 where ct > 1 group by symbol,flag; --结果如下 symbol start_tstamp bottom_tstamp end_tstamp ACME 2021-09-01 09:00:04 2021-09-01 09:00:07 2021-09-01 09:00:08
- 下面我们在看一个电商中的场景,用户浏览商品后会进行下单,下单后有可能会进行支付,我们需要分析某日某商品进行浏览、收藏、下单、支付的用户
- 先生成简单的示例数据
with tb1 as ( select user_id, shop_id, user_behav, op_time, substr(op_time,1,10) dt from values('1001','A1','浏览','2021-09-01 17:03:01'), ('1001','A1','收藏','2021-09-01 17:04:12'), ('1001','A2','浏览','2021-09-01 17:02:02'), ('1001','A2','收藏','2021-09-01 17:03:42'), ('1001','A2','下单','2021-09-01 17:06:25'), ('1002','A1','浏览','2021-09-01 17:00:32'), ('1002','A1','收藏','2021-09-01 17:03:12'), ('1002','A1','浏览','2021-09-01 17:03:45'), ('1002','A1','下单','2021-09-01 17:05:41'), ('1002','A1','支付','2021-09-01 17:06:26'), ('1003','A1','浏览','2021-09-01 17:08:13'), ('1003','A1','浏览','2021-09-01 17:09:14') t(user_id,shop_id,user_behav,op_time) )
- 我们只看A1店铺的数据,使用collect_list或者wm_concat(Maxcomputer内置函数,Hive中是concat_wm)进行汇总用户的行为
tb2 as ( select dt, user_id, -- collect_list(user_behav) wm_concat(",",user_behav) behavs from tb1 where shop_id = 'A1' group by dt,user_id ) --展示结果如下 dt user_id behavs 2021-09-01 1001 浏览,收藏 2021-09-01 1002 浏览,收藏,浏览,下单,支付 2021-09-01 1003 浏览,浏览
- 匹配规则使用like,这里需要下单之后的行为为支付
select dt, user_id, behavs from tb2 where behavs like '%浏览%收藏%下单_支付%'; --结果如下 dt user_id behavs 2021-09-01 1002 浏览,收藏,浏览,下单,支付
使用离线SQL分析分析匹配,主要是按维度把所有行为路径进行汇总拼接,然后使用字符匹配或者复杂的使用正则匹配,实际业务分析过程中,如有类似需求,可以参考上述方式。如有更好的方式,欢迎探讨。
拜了个拜