标签
PostgreSQL , 秒杀 , 批量扣减 , 流处理
背景
秒杀,即对同一个商品,消减库存。
带来的数据库问题是热点行,由于数据库最细粒度的锁通常是行锁,同一行同一时刻只能被一个事务更新,其他事务如果要更新同一行,会等待行级排它锁。
PostgreSQL中有几种方法来处理秒杀:
1、暴力,直接更新
2、skip locked,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。
3、advisory lock,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。
4、把更新转成写入,批量消费,可以在内核层面实现(批量消耗),也可以在业务层面实现。
看一下几种方法的性能。
create table t(
id int primary key, -- 商品ID
cnt int -- 库存
);
insert into t values (1,2000000000);
都使用100个并发连接。
1、暴力更新
100并发
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 100
number of threads: 100
duration: 120 s
number of transactions actually processed: 342042
latency average = 35.083 ms
latency stddev = 36.273 ms
tps = 2849.507392 (including connections establishing)
tps = 2849.837580 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
35.083 update t set cnt=cnt-1 where id=1;
2并发
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 2
number of threads: 2
duration: 120 s
number of transactions actually processed: 2819491
latency average = 0.085 ms
latency stddev = 0.009 ms
tps = 23495.740654 (including connections establishing)
tps = 23496.241610 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.085 update t set cnt=cnt-1 where id=1;
2、skip locked row
skip locked是PG提供的一种语法,可以跳过被锁的行。
update t set cnt=cnt-1 where ctid = any (array(select ctid from t where id=1 for update skip locked)) returning *;
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 100
number of threads: 100
duration: 120 s
number of transactions actually processed: 6508322
latency average = 1.844 ms
latency stddev = 2.390 ms
tps = 54226.911876 (including connections establishing)
tps = 54233.143956 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
1.843 update t set cnt=cnt-1 where ctid = any (array(select ctid from t where id=1 for update skip locked)) returning *;
3、advisory lock
advisory lock,更新时,锁住PK,而不是ROW本身,如果未获得锁,直接返回。与skip locked类似,但是更加高效,因为不需要SEARCH ROW。
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 100
number of threads: 100
duration: 120 s
number of transactions actually processed: 31690080
latency average = 0.379 ms
latency stddev = 0.671 ms
tps = 264047.289635 (including connections establishing)
tps = 264083.172081 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.379 update t set cnt=cnt-1 where id=1 and pg_try_advisory_xact_lock(1);
4、流式批量更新
流式批量处理,将更新转换为写入,避免热点行锁,然后批量合并到库存表。
但是需要注意,这个属于异步的方法,也就是说,可能导致库存负数。不过消费足够快的话,不会有太大问题。
1、创建FEED表,存储用户扣减库存的记录。
create table stat(
uid int, -- 用户ID
id int, -- 商品ID
cnt int, -- 购买数量
crt_time timestamp default now() -- 写入时间
);
2、创建用户扣减库存的函数,这里面使用一个判断,当库存(也就是说,默认不关心还没有合并到最终结果的那些消费记录。)
create or replace function consume(int, int, int) returns int as $$
insert into stat (uid, id, cnt) select $1 as uid, $2 as id, $3 as cnt from t where id=$2 and cnt+$3>=0 returning cnt;
$$ language sql strict;
3、调度,比如每100毫秒调度一次,异步合并
with tmp as (
delete from stat where ctid = any ( array (
select ctid from stat limit 1000000
)) returning *
),
t1 as (select id, sum(cnt) as cnt from tmp group by id)
update t set cnt=t.cnt+t1.cnt from t1 where t.id=t1.id;
4、调度(可以使用autovacuum自动调度),垃圾回收。
vacuum stat;
5、压测
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 100 -j 100 -T 120
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 100
number of threads: 100
duration: 120 s
number of transactions actually processed: 17155235
latency average = 0.699 ms
latency stddev = 0.546 ms
tps = 142929.999871 (including connections establishing)
tps = 142949.652076 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.702 select consume(1,1,1);
如果我们需要按先后顺序合并,可以加个索引
create index idx_stat_2 on stat(crt_time);
合并SQL如下:
with tmp as (
delete from stat where ctid = any ( array (
select ctid from stat order by crt_time limit 1000000
)) returning *
),
t1 as (select id, sum(cnt) as cnt from tmp group by id)
update t set cnt=t.cnt+t1.cnt from t1 where t.id=t1.id;
性能如下:
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 100
number of threads: 100
duration: 120 s
number of transactions actually processed: 10394002
latency average = 1.154 ms
latency stddev = 0.951 ms
tps = 86585.839187 (including connections establishing)
tps = 86597.281593 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
1.155 select consume(1,1);
消费速度与写入速度几乎一致。只有调度延迟。
如果通过内核层面来实现的话,可以避免库存负数这个问题,提高一定的性能,但是:为了不破坏原有的一致性和可靠性,同样不能避免批量提交前,会话占用数据库连接的问题。
所以是有利有弊的。
另一方面,如果我们在内部实现同一个ID最多分配给两个SERVER PROCESS执行,也能很好的解决这个问题。类似oracle的shared server mode,同时对id进行路由分配,至多给两个SHARED PROCESS,从而每个ID保证2万多的TPS。
类似的流计算案例
《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》
《PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...》
《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》
《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》
《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》
《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》
小结
处理能力如下
1、暴力,直接更新
2849/s (100并发)
2.35万/s (2并发)
2、skip locked,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。
5.4万/s
3、advisory lock,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。
26.4万/s
4、把更新转成写入,批量消费,可以在内核层面实现(批量消耗),也可以在业务层面实现。
14.3万/s (乱序消费)
8.6万/s (按顺序消费)
内核层面来解决热点,批量合并或者shared server process和根据ID分配(每个ID 2.35万/s的处理吞吐已经够用了,因为秒杀完后,库存为负时,就没有锁冲突问题了),是最靠谱的。