超时流式处理 - 没有消息流入的数据异常监控

标签

PostgreSQL , 流式处理 , 无流入数据超时异常


背景

流计算有个特点,数据流式写入,流式计算。

但是有一种情况,可能无法覆盖。例如电商中的 收货超时,退款处理超时 事件的流式监控。因为数据都不会再写进来了,所以也无法触发流式计算。

这些问题如何流式预警呢?

可以用超时时间+调度的方式,当然这里面有PostgreSQL的独门秘籍:

1、CTE,语法灵活。

2、partial index,不需要检索的数据不构建索引。

3、DML returning,可以返回DML语句的结果,结合CTE实现最小交互计算。

4、multi-index bitmap scan,多个索引合并扫描,在使用OR条件时,可以结合多个字段的索引进行合并扫描。

DEMO设计

1、被监控表的结构。里面记录了订单、退款等事件的超时处理时间,超时通知次数,下一次通知时间间隔,完结状态等。

create table tbl (  
  id int8,                                         
  --                               ..... 其他字段 (比如已完结状态)  
  state int,                       -- 完结状态(1 表示已完结)  
  deadts timestamp,                -- 超时时间        
  nts interval,                    -- 超时间隔,用于更新下一次通知时间 (比如一天通知一次)       
  notify_times int default 0,      -- 通知次数  
  deadts_next timestamp            -- 下一次通知时间  
);  

2、创建partial index,也就是说,对未完结工单才需要通知用户,这些数据是业务关心的,使用partial index可以简化索引大小。提高速度。

create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1;  
  
create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;  

3、获取需要通知的数据,并且更新通知次数以及下一次的通知时间。

with tmp1 as (  
update tbl set   
  deadts_next=now()+nts,   
  notify_times=notify_times+1   
where ctid = any (array(  
  select ctid from tbl where  
  ( deadts < now() and notify_times=0 and state<>1) 
  union all
  select ctid from tbl where
  ( deadts_next < now() and deadts_next is not null and state<>1)   
  limit 10000     -- 一次获取1万条超时数据    
))  
returning *  
)  
select * from tmp1;  

4、执行计划完美

 CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48)
   CTE tmp1
     ->  Update on tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54)
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.13..18151.03 rows=10000 width=6)
                   ->  Append  (cost=0.13..764699.60 rows=421301 width=6)
                         ->  Index Scan using idx_tbl_1 on tbl  (cost=0.13..169527.13 rows=369766 width=6)
                               Index Cond: (deadts < now())
                         ->  Index Scan using idx_tbl_2 on tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6)
                               Index Cond: (deadts_next < now())
           ->  Tid Scan on tbl tbl_2  (cost=0.01..12.21 rows=10 width=54)
                 TID Cond: (ctid = ANY ($0))
(12 rows)

5、调度

《PostgreSQL 定时任务方法2》

《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》

当然你如果使用阿里云,可以使用阿里云的调度平台,配置调度任务。

性能指标

1、写入1亿数据,假设有100万条同时超时需要处理,耗时如何?

-- 1亿条完结  
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);  
  
-- 100万条超时  
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);  

通知性能,比如每一批通知1万条:

(小批量获取,并更新超时时间,目的是让autovacuum介入,实时回收垃圾)

with tmp1 as (  
update tbl set   
  deadts_next=now()+nts,   
  notify_times=notify_times+1   
where ctid = any (array(  
  select ctid from tbl where  
  ( deadts < now() and notify_times=0 and state<>1)   
  union all
  select ctid from tbl where
  ( deadts_next < now() and deadts_next is not null and state<>1)   
  limit 10000     -- 一次获取1万条超时数据    
))  
returning *  
)  
select * from tmp1;  
  
  
-- 计划  
  
 CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)
   Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next
   Buffers: shared hit=75094 read=49 dirtied=49
   CTE tmp1
     ->  Update on public.tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)
           Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next
           Buffers: shared hit=75094 read=49 dirtied=49
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)
                   Output: tbl.ctid
                   Buffers: shared hit=11395
                   ->  Append  (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1)
                         Buffers: shared hit=11395
                         ->  Index Scan using idx_tbl_1 on public.tbl  (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)
                               Output: tbl.ctid
                               Index Cond: (tbl.deadts < now())
                               Buffers: shared hit=1
                         ->  Index Scan using idx_tbl_2 on public.tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)
                               Output: tbl_1.ctid
                               Index Cond: (tbl_1.deadts_next < now())
                               Buffers: shared hit=11394
           ->  Tid Scan on public.tbl tbl_2  (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)
                 Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid
                 TID Cond: (tbl_2.ctid = ANY ($0))
                 Buffers: shared hit=21395
 Planning time: 0.301 ms
 Execution time: 79.905 ms

丝般柔滑

Time: 79.905 ms  

小结

使用以上方法,可以完美的解决超时数据的监控问题。性能好。

上一篇:Linux:ln创建删除软连接


下一篇:使用Gson解析JSON数据