标签
PostgreSQL , 流式处理 , 无流入数据超时异常
背景
流计算有个特点,数据流式写入,流式计算。
但是有一种情况,可能无法覆盖。例如电商中的 收货超时,退款处理超时 事件的流式监控。因为数据都不会再写进来了,所以也无法触发流式计算。
这些问题如何流式预警呢?
可以用超时时间+调度的方式,当然这里面有PostgreSQL的独门秘籍:
1、CTE,语法灵活。
2、partial index,不需要检索的数据不构建索引。
3、DML returning,可以返回DML语句的结果,结合CTE实现最小交互计算。
4、multi-index bitmap scan,多个索引合并扫描,在使用OR条件时,可以结合多个字段的索引进行合并扫描。
DEMO设计
1、被监控表的结构。里面记录了订单、退款等事件的超时处理时间,超时通知次数,下一次通知时间间隔,完结状态等。
create table tbl (
id int8,
-- ..... 其他字段 (比如已完结状态)
state int, -- 完结状态(1 表示已完结)
deadts timestamp, -- 超时时间
nts interval, -- 超时间隔,用于更新下一次通知时间 (比如一天通知一次)
notify_times int default 0, -- 通知次数
deadts_next timestamp -- 下一次通知时间
);
2、创建partial index,也就是说,对未完结工单才需要通知用户,这些数据是业务关心的,使用partial index可以简化索引大小。提高速度。
create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1;
create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;
3、获取需要通知的数据,并且更新通知次数以及下一次的通知时间。
with tmp1 as (
update tbl set
deadts_next=now()+nts,
notify_times=notify_times+1
where ctid = any (array(
select ctid from tbl where
( deadts < now() and notify_times=0 and state<>1)
union all
select ctid from tbl where
( deadts_next < now() and deadts_next is not null and state<>1)
limit 10000 -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;
4、执行计划完美
CTE Scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48)
CTE tmp1
-> Update on tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54)
InitPlan 1 (returns $0)
-> Limit (cost=0.13..18151.03 rows=10000 width=6)
-> Append (cost=0.13..764699.60 rows=421301 width=6)
-> Index Scan using idx_tbl_1 on tbl (cost=0.13..169527.13 rows=369766 width=6)
Index Cond: (deadts < now())
-> Index Scan using idx_tbl_2 on tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6)
Index Cond: (deadts_next < now())
-> Tid Scan on tbl tbl_2 (cost=0.01..12.21 rows=10 width=54)
TID Cond: (ctid = ANY ($0))
(12 rows)
5、调度
《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》
当然你如果使用阿里云,可以使用阿里云的调度平台,配置调度任务。
性能指标
1、写入1亿数据,假设有100万条同时超时需要处理,耗时如何?
-- 1亿条完结
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);
-- 100万条超时
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);
通知性能,比如每一批通知1万条:
(小批量获取,并更新超时时间,目的是让autovacuum介入,实时回收垃圾)
with tmp1 as (
update tbl set
deadts_next=now()+nts,
notify_times=notify_times+1
where ctid = any (array(
select ctid from tbl where
( deadts < now() and notify_times=0 and state<>1)
union all
select ctid from tbl where
( deadts_next < now() and deadts_next is not null and state<>1)
limit 10000 -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;
-- 计划
CTE Scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)
Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next
Buffers: shared hit=75094 read=49 dirtied=49
CTE tmp1
-> Update on public.tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)
Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next
Buffers: shared hit=75094 read=49 dirtied=49
InitPlan 1 (returns $0)
-> Limit (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)
Output: tbl.ctid
Buffers: shared hit=11395
-> Append (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1)
Buffers: shared hit=11395
-> Index Scan using idx_tbl_1 on public.tbl (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)
Output: tbl.ctid
Index Cond: (tbl.deadts < now())
Buffers: shared hit=1
-> Index Scan using idx_tbl_2 on public.tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)
Output: tbl_1.ctid
Index Cond: (tbl_1.deadts_next < now())
Buffers: shared hit=11394
-> Tid Scan on public.tbl tbl_2 (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)
Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid
TID Cond: (tbl_2.ctid = ANY ($0))
Buffers: shared hit=21395
Planning time: 0.301 ms
Execution time: 79.905 ms
丝般柔滑
Time: 79.905 ms
小结
使用以上方法,可以完美的解决超时数据的监控问题。性能好。