标签
PostgreSQL , 分区表 , 在线转换
背景
非分区表,如何在线(不影响业务)转换为分区表?
方法1,pg_pathman分区插件
《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》
使用非堵塞式的迁移接口
partition_table_concurrently(
relation REGCLASS, -- 主表OID
batch_size INTEGER DEFAULT 1000, -- 一个事务批量迁移多少记录
sleep_time FLOAT8 DEFAULT 1.0) -- 获得行锁失败时,休眠多久再次获取,重试60次退出任务。
postgres=# select partition_table_concurrently('part_test'::regclass,
10000,
1.0);
NOTICE: worker started, you can stop it with the following command: select stop_concurrent_part_task('part_test');
partition_table_concurrently
------------------------------
(1 row)
迁移结束后,主表数据已经没有了,全部在分区中
postgres=# select count(*) from only part_test;
count
-------
0
(1 row)
数据迁移完成后,建议禁用主表,这样执行计划就不会出现主表了
postgres=# select set_enable_parent('part_test'::regclass, false);
set_enable_parent
-------------------
(1 row)
方法2,原生分区
使用继承表,触发器,异步迁移,交换表名一系列步骤,在线将非分区表,转换为分区表(交换表名是需要短暂的堵塞)。
关键技术:
1、继承表(子分区)
对select, update, delete, truncate, drop透明。
2、触发器
插入,采用before触发器,数据路由到继承分区
更新,采用before触发器,删除老表记录,同时将更新后的数据插入新表
3、后台迁移数据,cte only skip locked , delete only, insert into new table
4、迁移结束(p表没有数据后),短暂上锁,剥离INHERTI关系,切换到原生分区,切换表名。
例子
将一个表在线转换为LIST分区表(伪HASH分区)。
范围分区类似。
如果要转换为原生HASH分区表,需要提取pg内置HASH分区算法。
1、创建测试表(需要被分区的表)
create table old (id int primary key, info text, crt_time timestamp);
2、写入1000万测试记录
insert into old select generate_series(1,10000000) , md5(random()::text) , now();
3、创建子分区(本例使用LIST分区)
do language plpgsql $$
declare
parts int := 4;
begin
for i in 0..parts-1 loop
execute format('create table old_mid%s (like old including all) inherits (old)', i);
execute format('alter table old_mid%s add constraint ck check(abs(mod(id,%s))=%s)', i, parts, i);
end loop;
end;
$$;
4、插入,采用before触发器,路由到新表
create or replace function ins_tbl() returns trigger as $$
declare
begin
case abs(mod(NEW.id,4))
when 0 then
insert into old_mid0 values (NEW.*);
when 1 then
insert into old_mid1 values (NEW.*);
when 2 then
insert into old_mid2 values (NEW.*);
when 3 then
insert into old_mid3 values (NEW.*);
else
return NEW; -- 如果是NULL则写本地父表,主键不会为NULL
end case;
return null;
end;
$$ language plpgsql strict;
create trigger tg1 before insert on old for each row execute procedure ins_tbl();
5、更新,采用before触发器,删除老表,同时将更新后的数据插入新表
create or replace function upd_tbl () returns trigger as $$
declare
begin
case abs(mod(NEW.id,4))
when 0 then
insert into old_mid0 values (NEW.*);
when 1 then
insert into old_mid1 values (NEW.*);
when 2 then
insert into old_mid2 values (NEW.*);
when 3 then
insert into old_mid3 values (NEW.*);
else
return NEW; -- 如果是NULL则写本地父表,主键不会为NULL
end case;
delete from only old where id=NEW.id;
return null;
end;
$$ language plpgsql strict;
create trigger tg2 before update on old for each row execute procedure upd_tbl();
6、old table 如下
postgres=# \dt+ old
List of relations
Schema | Name | Type | Owner | Size | Description
--------+------+-------+----------+--------+-------------
public | old | table | postgres | 730 MB |
(1 row)
继承关系如下
postgres=# \d+ old
Table "public.old"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
id | integer | | not null | | plain | |
info | text | | | | extended | |
crt_time | timestamp without time zone | | | | plain | |
Indexes:
"old_pkey" PRIMARY KEY, btree (id)
Triggers:
tg1 BEFORE INSERT ON old FOR EACH ROW EXECUTE PROCEDURE ins_tbl()
tg2 BEFORE UPDATE ON old FOR EACH ROW EXECUTE PROCEDURE upd_tbl()
Child tables: old_mid0,
old_mid1,
old_mid2,
old_mid3
7、验证insert, update, delete, select完全符合要求。对业务SQL请求透明。
postgres=# insert into old values (0,'test',now());
INSERT 0 0
postgres=# select tableoid::regclass,* from old where id=1;
tableoid | id | info | crt_time
----------+----+----------------------------------+---------------------------
old | 1 | 22be06200f2a967104872f6f173fd038 | 31-JAN-19 12:52:25.887242
(1 row)
postgres=# select tableoid::regclass,* from old where id=0;
tableoid | id | info | crt_time
----------+----+------+---------------------------
old_mid0 | 0 | test | 31-JAN-19 13:02:35.859899
(1 row)
postgres=# update old set info='abc' where id in (0,2) returning tableoid::regclass,*;
tableoid | id | info | crt_time
----------+----+------+---------------------------
old_mid0 | 0 | abc | 31-JAN-19 13:02:35.859899
(1 row)
UPDATE 1
postgres=# select tableoid::regclass,* from old where id in (0,2);
tableoid | id | info | crt_time
----------+----+------+---------------------------
old_mid0 | 0 | abc | 31-JAN-19 13:12:03.343559
old_mid2 | 2 | abc | 31-JAN-19 13:11:04.763652
(2 rows)
postgres=# delete from old where id=3;
DELETE 1
postgres=# select tableoid::regclass,* from old where id=3;
tableoid | id | info | crt_time
----------+----+------+----------
(0 rows)
8、开启压测,后台对原表数据进行迁移
create or replace function test_ins(int) returns void as $$
declare
begin
insert into old values ($1,'test',now());
exception when others then
return;
end;
$$ language plpgsql strict;
vi test.sql
\set id1 random(10000001,200000000)
\set id2 random(1,5000000)
\set id3 random(5000001,10000000)
delete from old where id=:id2;
update old set info=md5(random()::text),crt_time=now() where id=:id3;
select test_ins(:id1);
开启压测
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 4 -j 4 -T 1200
...
progress: 323.0 s, 12333.1 tps, lat 0.324 ms stddev 0.036
progress: 324.0 s, 11612.9 tps, lat 0.344 ms stddev 0.203
progress: 325.0 s, 12546.0 tps, lat 0.319 ms stddev 0.061
progress: 326.0 s, 12728.7 tps, lat 0.314 ms stddev 0.038
progress: 327.0 s, 12536.9 tps, lat 0.319 ms stddev 0.040
progress: 328.0 s, 12534.1 tps, lat 0.319 ms stddev 0.042
progress: 329.0 s, 12228.1 tps, lat 0.327 ms stddev 0.047
...
9、在线迁移数据
批量迁移,每一批迁移N条。调用以下SQL
with a as (
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *
)
insert into old select * from a;
INSERT 0 0
postgres=# select count(*) from only old;
count
---------
9998998
(1 row)
postgres=# select count(*) from old;
count
----------
10000000
(1 row)
postgres=# with a as (
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *
)
insert into old select * from a;
INSERT 0 0
postgres=# select count(*) from old;
count
----------
10000000
(1 row)
postgres=# select count(*) from only old;
count
---------
9997998
(1 row)
postgres=# with a as (
delete from only old where ctid = any (array (select ctid from only old limit 100000 for update skip locked) ) returning *
)
insert into old select * from a;
INSERT 0 0
postgres=# select count(*) from only old;
count
---------
9897998
(1 row)
postgres=# select count(*) from old;
count
----------
10000000
(1 row)
一次迁移1万条,分批操作。
with a as (
delete from only old where ctid = any (array (select ctid from only old limit 10000 for update skip locked) ) returning *
)
insert into old select * from a;
持续调用以上接口,直到当old表已经没有数据,完全迁移到了分区。
select count(*) from only old;
count
-------
0
(1 row)
10、切换到分区表
创建分区表如下,分区方法与继承约束一致。
create table new (id int, info text, crt_time timestamp) partition by list (abs(mod(id,4)));
切换表名,防止雪崩,使用锁超时,由于只涉及表名变更,所以速度非常快。
begin;
set lock_timeout ='3s';
alter table old_mid0 no inherit old;
alter table old_mid1 no inherit old;
alter table old_mid2 no inherit old;
alter table old_mid3 no inherit old;
alter table old rename to old_tmp;
alter table new rename to old;
alter table old ATTACH PARTITION old_mid0 for values in (0);
alter table old ATTACH PARTITION old_mid1 for values in (1);
alter table old ATTACH PARTITION old_mid2 for values in (2);
alter table old ATTACH PARTITION old_mid3 for values in (3);
end;
切换后的原生分区表如下
postgres=# \d+ old
Table "public.old"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
id | integer | | | | plain | |
info | text | | | | extended | |
crt_time | timestamp without time zone | | | | plain | |
Partition key: LIST (abs(mod(id, 4)))
Partitions: old_mid0 FOR VALUES IN (0),
old_mid1 FOR VALUES IN (1),
old_mid2 FOR VALUES IN (2),
old_mid3 FOR VALUES IN (3)
查询测试
postgres=# explain select * from old where id=1;
QUERY PLAN
-------------------------------------------------------------------------------------
Append (cost=0.29..10.04 rows=4 width=44)
-> Index Scan using old_mid0_pkey on old_mid0 (cost=0.29..2.51 rows=1 width=44)
Index Cond: (id = 1)
-> Index Scan using old_mid1_pkey on old_mid1 (cost=0.29..2.51 rows=1 width=45)
Index Cond: (id = 1)
-> Index Scan using old_mid2_pkey on old_mid2 (cost=0.29..2.51 rows=1 width=44)
Index Cond: (id = 1)
-> Index Scan using old_mid3_pkey on old_mid3 (cost=0.29..2.51 rows=1 width=45)
Index Cond: (id = 1)
(9 rows)
postgres=# explain select * from old where id=? and abs(mod(id, 4)) = abs(mod(?, 4));
QUERY PLAN
-------------------------------------------------------------------------------------
Append (cost=0.29..2.52 rows=1 width=45)
-> Index Scan using old_mid1_pkey on old_mid1 (cost=0.29..2.51 rows=1 width=45)
Index Cond: (id = 1)
Filter: (mod(id, 4) = 1)
(4 rows)
数据
postgres=# select count(*) from old;
count
----------
10455894
(1 row)
方法3,logical replication
使用逻辑复制的方法,同步到分区表。
简单步骤如下:
snapshot 快照(lsn位点)
全量
增量(逻辑复制,从LSN位置开始解析WAL LOG)
切换表名
略
其他
hash函数
postgres=# \df *.*hash*
List of functions
Schema | Name | Result data type | Argument data types | Type
------------+--------------------------+------------------+---------------------------------------+------
pg_catalog | hash_aclitem | integer | aclitem | func
pg_catalog | hash_aclitem_extended | bigint | aclitem, bigint | func
pg_catalog | hash_array | integer | anyarray | func
pg_catalog | hash_array_extended | bigint | anyarray, bigint | func
pg_catalog | hash_numeric | integer | numeric | func
pg_catalog | hash_numeric_extended | bigint | numeric, bigint | func
pg_catalog | hash_range | integer | anyrange | func
pg_catalog | hash_range_extended | bigint | anyrange, bigint | func
pg_catalog | hashbpchar | integer | character | func
pg_catalog | hashbpcharextended | bigint | character, bigint | func
pg_catalog | hashchar | integer | "char" | func
pg_catalog | hashcharextended | bigint | "char", bigint | func
pg_catalog | hashenum | integer | anyenum | func
pg_catalog | hashenumextended | bigint | anyenum, bigint | func
pg_catalog | hashfloat4 | integer | real | func
pg_catalog | hashfloat4extended | bigint | real, bigint | func
pg_catalog | hashfloat8 | integer | double precision | func
pg_catalog | hashfloat8extended | bigint | double precision, bigint | func
pg_catalog | hashhandler | index_am_handler | internal | func
pg_catalog | hashinet | integer | inet | func
pg_catalog | hashinetextended | bigint | inet, bigint | func
pg_catalog | hashint2 | integer | smallint | func
pg_catalog | hashint2extended | bigint | smallint, bigint | func
pg_catalog | hashint4 | integer | integer | func
pg_catalog | hashint4extended | bigint | integer, bigint | func
pg_catalog | hashint8 | integer | bigint | func
pg_catalog | hashint8extended | bigint | bigint, bigint | func
pg_catalog | hashmacaddr | integer | macaddr | func
pg_catalog | hashmacaddr8 | integer | macaddr8 | func
pg_catalog | hashmacaddr8extended | bigint | macaddr8, bigint | func
pg_catalog | hashmacaddrextended | bigint | macaddr, bigint | func
pg_catalog | hashname | integer | name | func
pg_catalog | hashnameextended | bigint | name, bigint | func
pg_catalog | hashoid | integer | oid | func
pg_catalog | hashoidextended | bigint | oid, bigint | func
pg_catalog | hashoidvector | integer | oidvector | func
pg_catalog | hashoidvectorextended | bigint | oidvector, bigint | func
pg_catalog | hashtext | integer | text | func
pg_catalog | hashtextextended | bigint | text, bigint | func
pg_catalog | hashvarlena | integer | internal | func
pg_catalog | hashvarlenaextended | bigint | internal, bigint | func
pg_catalog | interval_hash | integer | interval | func
pg_catalog | interval_hash_extended | bigint | interval, bigint | func
pg_catalog | jsonb_hash | integer | jsonb | func
pg_catalog | jsonb_hash_extended | bigint | jsonb, bigint | func
pg_catalog | pg_lsn_hash | integer | pg_lsn | func
pg_catalog | pg_lsn_hash_extended | bigint | pg_lsn, bigint | func
pg_catalog | satisfies_hash_partition | boolean | oid, integer, integer, VARIADIC "any" | func
pg_catalog | time_hash | integer | time without time zone | func
pg_catalog | time_hash_extended | bigint | time without time zone, bigint | func
pg_catalog | timestamp_hash | integer | timestamp without time zone | func
pg_catalog | timestamp_hash_extended | bigint | timestamp without time zone, bigint | func
pg_catalog | timetz_hash | integer | time with time zone | func
pg_catalog | timetz_hash_extended | bigint | time with time zone, bigint | func
pg_catalog | uuid_hash | integer | uuid | func
pg_catalog | uuid_hash_extended | bigint | uuid, bigint | func
(56 rows)
小结
在线将表转换为分区表,可以使用的方法:
1、转换为pg_pathman分区,直接调用pg_pathman的UDF即可。
2、转换为原生分区,使用继承,异步迁移的方法。割接是短暂锁表。
不支持 insert ino on conflict 语法。
insert into old values (1,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;
3、逻辑复制的方法,将数据增量迁移到分区表(目标可以是原生分区方法或者是pg_pathman分区方法的新表)。
参考
《PostgreSQL 9.x, 10, 11 hash分区表 用法举例》
《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》