模式匹配在SQL中应用

很多场景中都会应用到模式匹配如:用户异常行为实时监测、银行卡异地监控、下单未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句进行复杂事件处理,我们先看个FlinkSQL中如何识别

  1. 在FlinkSQL client中创建一个测试表Ticket 其schema 如下
Ticket
     |-- symbol: String                           # 股票的代号
     |-- price: Long                              # 股票的价格
     |-- tax: Long                                # 股票应纳税额
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # 更改这些值的时间点
Flink SQL> CREATE TABLE Ticket (
>             symbol string,
>             price int,
>             tax int,
>             rowtime TIMESTAMP(3),
>             WATERMARK FOR rowtime AS rowtime --模式匹配必须要有水位线
>          ) WITH (
>           'connector' = 'filesystem',
>           'format' = 'csv',
>           'path' = 'file:///mnt/ps/SAS/BigData/file/ticket.csv'
> );
[INFO] Execute statement succeed.

Flink SQL> 
  1. 为了简化,我们只考虑单个股票 ACME 的传入数据。其中的行是连续追加的。查询数据如下
symbol                          price                            tax                        rowtime
ACME                             12                              1        2021-09-01 09:00:00.000
ACME                             17                              2        2021-09-01 09:00:01.000
ACME                             19                              1        2021-09-01 09:00:02.000
ACME                             21                              3        2021-09-01 09:00:03.000
ACME                             25                              2        2021-09-01 09:00:04.000
ACME                             18                              1        2021-09-01 09:00:05.000
ACME                             15                              1        2021-09-01 09:00:06.000
ACME                             14                              2        2021-09-01 09:00:07.000
ACME                             24                              2        2021-09-01 09:00:08.000
ACME                             25                              2        2021-09-01 09:00:09.000
ACME                             19                              1        2021-09-01 09:00:10.000
  1. 现在的任务是找出一个单一股票价格不断下降的时期
SELECT 
    *
FROM Ticket
    MATCH_RECOGNIZE (   --只能用于追加表
        PARTITION BY symbol --按symbol分组,相同数据会在一个节点进行计算
        ORDER BY rowtime    --同一组下按事件时间进行排序
        MEASURES    --定义输出
            START_ROW.rowtime AS start_tstamp,
            LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
            LAST(PRICE_UP.rowtime) AS end_tstamp
        ONE ROW PER MATCH   --匹配成功输出一条
        AFTER MATCH SKIP TO LAST PRICE_UP   --从匹配成功的事件序列中最后一个对应价格上升的事件开始匹配下一次
        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)    --定义3个事件:开始行 价格下降 价格回升(+号代表一个或多个数据)
        DEFINE  --定义事件的具体含义
            PRICE_DOWN AS   --上一条价格下降事件的价格为空并且下降事件的价格小于开始行的价格或者下降事件的价格小于上一条的价格
                (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                    PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
            PRICE_UP AS --价格回升事件的价格大于上一条价格下降价格
                PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;
--结果如下 从2021-09-01 09:00:04.000开始下降,直到2021-09-01 09:00:08.00回涨
symbol      start_tstamp                  bottom_tstamp                     end_tstamp
ACME        2021-09-01 09:00:04.000        2021-09-01 09:00:07.000        2021-09-01 09:00:08.000
  • 离线SQL如何去实现呢
  1. 先生成示例数据源
with tb1 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime
    from values('ACME',12,1,'2021-09-01 09:00:00'),
               ('ACME',17,2,'2021-09-01 09:00:01'),
               ('ACME',19,1,'2021-09-01 09:00:02'),
               ('ACME',21,3,'2021-09-01 09:00:03'),
               ('ACME',25,2,'2021-09-01 09:00:04'),
               ('ACME',18,1,'2021-09-01 09:00:05'),
               ('ACME',15,1,'2021-09-01 09:00:06'),
               ('ACME',14,2,'2021-09-01 09:00:07'),
               ('ACME',24,2,'2021-09-01 09:00:08'),
               ('ACME',25,2,'2021-09-01 09:00:09'),
               ('ACME',19,1,'2021-09-01 09:00:10')
               t(symbol,price,tax,rowtime)
)
  1. 通过计算上一条和下一条的股票价格差判断是否连续下降
tb2 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime,
        price-lag(price,1,price) over(partition by symbol order by rowtime) lag_price_diff,
        lead(price,1,price) over(partition by symbol order by rowtime)-price lead_price_diff,
        lead(rowtime,1,rowtime) over(partition by symbol order by rowtime) as lead_rowtime
    from tb1
)
--结果展示如下
symbol  price   tax rowtime lag_price_diff  lead_price_diff
ACME        12      1       2021-09-01 09:00:00 0       5
ACME        17      2       2021-09-01 09:00:01 5       2
ACME        19      1       2021-09-01 09:00:02 2       2
ACME        21      3       2021-09-01 09:00:03 2       4
ACME        25      2       2021-09-01 09:00:04 4       -7
ACME        18      1       2021-09-01 09:00:05 -7  -3
ACME        15      1       2021-09-01 09:00:06 -3  -1
ACME        14      2       2021-09-01 09:00:07 -1  10
ACME        24      2       2021-09-01 09:00:08 10  1
ACME        25      2       2021-09-01 09:00:09 1       -6
ACME        19      1       2021-09-01 09:00:10 -6  0
  1. 差值为负值即价格下降,根据此进行划分标签
tb3 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime,
        lag_price_diff,
        lead_price_diff,
        lead_rowtime,
        sum(if(lag_price_diff>0,1,0)) over(partition by symbol order by rowtime) flag
    from tb2
    where lag_price_diff<0 or lead_price_diff < 0
)
--结果如下
symbol  price   tax rowtime lag_price_diff  lead_price_diff lead_rowtime    flag
ACME    25  2   2021-09-01 09:00:04 4     -7    2021-09-01 09:00:05 1
ACME    18  1   2021-09-01 09:00:05 -7  -3  2021-09-01 09:00:06 1
ACME    15  1   2021-09-01 09:00:06 -3  -1  2021-09-01 09:00:07 1
ACME    14  2   2021-09-01 09:00:07 -1  10  2021-09-01 09:00:08 1
ACME    25  2   2021-09-01 09:00:09 1     -6    2021-09-01 09:00:10 2
ACME    19  1   2021-09-01 09:00:10 -6  0     2021-09-01 09:00:10   2
  1. 标签flag为2的不符合连续下降,只有一条差值位置,不是连续下降,需要进行过滤
tb4 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime,
        lead_rowtime,
        flag,
        sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct
    from tb3
)
--结果如下
symbol  price   tax rowtime lead_rowtime    flag    ct
ACME    25  2   2021-09-01 09:00:04 2021-09-01 09:00:05 1   3
ACME    18  1   2021-09-01 09:00:05 2021-09-01 09:00:06 1   3
ACME    15  1   2021-09-01 09:00:06 2021-09-01 09:00:07 1   3
ACME    14  2   2021-09-01 09:00:07 2021-09-01 09:00:08 1   3
ACME    25  2   2021-09-01 09:00:09 2021-09-01 09:00:10 2   1
ACME    19  1   2021-09-01 09:00:10 2021-09-01 09:00:10 2   1
  1. 根据真实情况即连续的定义对数据进行过滤,统计结果
select 
    symbol,
    --flag,
    min(rowtime) start_tstamp,
    max(rowtime) bottom_tstamp,
    max(lead_rowtime) end_tstamp
from tb4
where ct > 1
group by symbol,flag;
--结果如下
symbol  start_tstamp                bottom_tstamp               end_tstamp
ACME        2021-09-01 09:00:04 2021-09-01 09:00:07 2021-09-01 09:00:08
  • 下面我们在看一个电商中的场景,用户浏览商品后会进行下单,下单后有可能会进行支付,我们需要分析某日某商品进行浏览、收藏、下单、支付的用户
  1. 先生成简单的示例数据
with tb1 as (
    select 
        user_id,
        shop_id,
        user_behav,
        op_time,
        substr(op_time,1,10) dt
    from values('1001','A1','浏览','2021-09-01 17:03:01'),
               ('1001','A1','收藏','2021-09-01 17:04:12'),
               ('1001','A2','浏览','2021-09-01 17:02:02'),
               ('1001','A2','收藏','2021-09-01 17:03:42'),
               ('1001','A2','下单','2021-09-01 17:06:25'),
               ('1002','A1','浏览','2021-09-01 17:00:32'),
               ('1002','A1','收藏','2021-09-01 17:03:12'),
               ('1002','A1','浏览','2021-09-01 17:03:45'),
               ('1002','A1','下单','2021-09-01 17:05:41'),
               ('1002','A1','支付','2021-09-01 17:06:26'),
               ('1003','A1','浏览','2021-09-01 17:08:13'),
               ('1003','A1','浏览','2021-09-01 17:09:14')
               t(user_id,shop_id,user_behav,op_time)       
)
  1. 我们只看A1店铺的数据,使用collect_list或者wm_concat(Maxcomputer内置函数,Hive中是concat_wm)进行汇总用户的行为
tb2 as (
    select 
        dt,
        user_id,
        -- collect_list(user_behav)
        wm_concat(",",user_behav) behavs
    from tb1
    where shop_id = 'A1'
    group by dt,user_id
)
--展示结果如下
dt  user_id behavs
2021-09-01  1001    浏览,收藏
2021-09-01  1002    浏览,收藏,浏览,下单,支付
2021-09-01  1003    浏览,浏览
  1. 匹配规则使用like,这里需要下单之后的行为为支付
select 
    dt,
    user_id,
    behavs
from tb2
where behavs like '%浏览%收藏%下单_支付%';
--结果如下
dt  user_id behavs
2021-09-01  1002    浏览,收藏,浏览,下单,支付

使用离线SQL分析分析匹配,主要是按维度把所有行为路径进行汇总拼接,然后使用字符匹配或者复杂的使用正则匹配,实际业务分析过程中,如有类似需求,可以参考上述方式。如有更好的方式,欢迎探讨。

拜了个拜

上一篇:umbraco入门(一)——认识以及安装umbraco(图解)


下一篇:几个.NET基础类库的常用方法