PostgreSQL 秒杀4种方法 - 增加 批量流式加减库存 方法

标签

PostgreSQL , 秒杀 , 批量扣减 , 流处理


背景

秒杀,即对同一个商品,消减库存。

带来的数据库问题是热点行,由于数据库最细粒度的锁通常是行锁,同一行同一时刻只能被一个事务更新,其他事务如果要更新同一行,会等待行级排它锁。

PostgreSQL中有几种方法来处理秒杀:

1、暴力,直接更新

2、skip locked,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

3、advisory lock,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

4、把更新转成写入,批量消费,可以在内核层面实现(批量消耗),也可以在业务层面实现。

看一下几种方法的性能。

create table t(    
  id int primary key,  -- 商品ID    
  cnt int              -- 库存    
);    
    
insert into t values (1,2000000000);    

都使用100个并发连接。

1、暴力更新

100并发

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 342042    
latency average = 35.083 ms    
latency stddev = 36.273 ms    
tps = 2849.507392 (including connections establishing)    
tps = 2849.837580 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
        35.083  update t set cnt=cnt-1 where id=1;    

2并发

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 2  
number of threads: 2  
duration: 120 s  
number of transactions actually processed: 2819491  
latency average = 0.085 ms  
latency stddev = 0.009 ms  
tps = 23495.740654 (including connections establishing)  
tps = 23496.241610 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.085  update t set cnt=cnt-1 where id=1;  

2、skip locked row

skip locked是PG提供的一种语法,可以跳过被锁的行。

update t set cnt=cnt-1 where ctid = any (array(select ctid from t where id=1 for update skip locked)) returning *;    
    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 6508322    
latency average = 1.844 ms    
latency stddev = 2.390 ms    
tps = 54226.911876 (including connections establishing)    
tps = 54233.143956 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         1.843  update t set cnt=cnt-1 where ctid = any (array(select ctid from t where id=1 for update skip locked)) returning *;    

3、advisory lock

advisory lock,更新时,锁住PK,而不是ROW本身,如果未获得锁,直接返回。与skip locked类似,但是更加高效,因为不需要SEARCH ROW。

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 31690080    
latency average = 0.379 ms    
latency stddev = 0.671 ms    
tps = 264047.289635 (including connections establishing)    
tps = 264083.172081 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.379  update t set cnt=cnt-1 where id=1 and pg_try_advisory_xact_lock(1);    

4、流式批量更新

流式批量处理,将更新转换为写入,避免热点行锁,然后批量合并到库存表。

但是需要注意,这个属于异步的方法,也就是说,可能导致库存负数。不过消费足够快的话,不会有太大问题。

1、创建FEED表,存储用户扣减库存的记录。

create table stat(    
  uid int,   -- 用户ID    
  id int,    -- 商品ID    
  cnt int,   -- 购买数量    
  crt_time timestamp default now()  -- 写入时间    
);    

2、创建用户扣减库存的函数,这里面使用一个判断,当库存(也就是说,默认不关心还没有合并到最终结果的那些消费记录。)

create or replace function consume(int, int, int) returns int as $$    
  insert into stat (uid, id, cnt) select $1 as uid, $2 as id, $3 as cnt from t where id=$2 and cnt+$3>=0 returning cnt;    
$$ language sql strict;     

3、调度,比如每100毫秒调度一次,异步合并

with tmp as (    
delete from stat where ctid = any ( array (    
  select ctid from stat limit 1000000    
)) returning *    
),    
t1 as (select id, sum(cnt) as cnt from tmp group by id)    
update t set cnt=t.cnt+t1.cnt from t1 where t.id=t1.id;    

4、调度(可以使用autovacuum自动调度),垃圾回收。

vacuum stat;    

5、压测

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 100 -j 100 -T 120    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 17155235    
latency average = 0.699 ms    
latency stddev = 0.546 ms    
tps = 142929.999871 (including connections establishing)    
tps = 142949.652076 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.702  select consume(1,1,1);    

如果我们需要按先后顺序合并,可以加个索引

create index idx_stat_2 on stat(crt_time);    

合并SQL如下:

with tmp as (    
delete from stat where ctid = any ( array (    
  select ctid from stat order by crt_time limit 1000000    
)) returning *    
),    
t1 as (select id, sum(cnt) as cnt from tmp group by id)    
update t set cnt=t.cnt+t1.cnt from t1 where t.id=t1.id;    

性能如下:

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 10394002    
latency average = 1.154 ms    
latency stddev = 0.951 ms    
tps = 86585.839187 (including connections establishing)    
tps = 86597.281593 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         1.155  select consume(1,1);    

消费速度与写入速度几乎一致。只有调度延迟。

如果通过内核层面来实现的话,可以避免库存负数这个问题,提高一定的性能,但是:为了不破坏原有的一致性和可靠性,同样不能避免批量提交前,会话占用数据库连接的问题。

所以是有利有弊的。

另一方面,如果我们在内部实现同一个ID最多分配给两个SERVER PROCESS执行,也能很好的解决这个问题。类似oracle的shared server mode,同时对id进行路由分配,至多给两个SHARED PROCESS,从而每个ID保证2万多的TPS。

类似的流计算案例

《超时流式处理 - 没有消息流入的数据异常监控》

《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》

《PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》

小结

处理能力如下

1、暴力,直接更新

2849/s (100并发)

2.35万/s (2并发)

2、skip locked,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

5.4万/s

3、advisory lock,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

26.4万/s

4、把更新转成写入,批量消费,可以在内核层面实现(批量消耗),也可以在业务层面实现。

14.3万/s (乱序消费)

8.6万/s (按顺序消费)

内核层面来解决热点,批量合并或者shared server process和根据ID分配(每个ID 2.35万/s的处理吞吐已经够用了,因为秒杀完后,库存为负时,就没有锁冲突问题了),是最靠谱的。

上一篇:knative serving 0.10.0 版本变更


下一篇:什么是跨域?跨域详解