PostgreSQL 普通表在线转换为分区表 - online exchange to partition table

标签

PostgreSQL , 分区表 , 在线转换


背景

非分区表,如何在线(不影响业务)转换为分区表?

方法1,pg_pathman分区插件

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

使用非堵塞式的迁移接口

partition_table_concurrently(  
  relation   REGCLASS,              -- 主表OID  
  batch_size INTEGER DEFAULT 1000,  -- 一个事务批量迁移多少记录  
  sleep_time FLOAT8 DEFAULT 1.0)    -- 获得行锁失败时,休眠多久再次获取,重试60次退出任务。  
  
  
postgres=# select partition_table_concurrently('part_test'::regclass,  
                             10000,  
                             1.0);  
NOTICE:  worker started, you can stop it with the following command: select stop_concurrent_part_task('part_test');  
 partition_table_concurrently   
------------------------------  
   
(1 row)  

迁移结束后,主表数据已经没有了,全部在分区中

postgres=# select count(*) from only part_test;  
 count   
-------  
     0  
(1 row)  

数据迁移完成后,建议禁用主表,这样执行计划就不会出现主表了

postgres=# select set_enable_parent('part_test'::regclass, false);  
 set_enable_parent   
-------------------  
   
(1 row)  

方法2,原生分区

使用继承表,触发器,异步迁移,交换表名一系列步骤,在线将非分区表,转换为分区表(交换表名是需要短暂的堵塞)。

关键技术:

1、继承表(子分区)

对select, update, delete, truncate, drop透明。

2、触发器

插入,采用before触发器,数据路由到继承分区

更新,采用before触发器,删除老表记录,同时将更新后的数据插入新表

3、后台迁移数据,cte only skip locked , delete only, insert into new table

4、迁移结束(p表没有数据后),短暂上锁,剥离INHERTI关系,切换到原生分区,切换表名。

例子

将一个表在线转换为LIST分区表(伪HASH分区)。

范围分区类似。

如果要转换为原生HASH分区表,需要提取pg内置HASH分区算法。

1、创建测试表(需要被分区的表)

create table old (id int primary key, info text, crt_time timestamp);  

2、写入1000万测试记录

insert into old select generate_series(1,10000000) , md5(random()::text) , now();  

3、创建子分区(本例使用LIST分区)

do language plpgsql $$    
declare    
  parts int := 4;    
begin    
  for i in 0..parts-1 loop    
    execute format('create table old_mid%s (like old including all) inherits (old)', i);    
    execute format('alter table old_mid%s add constraint ck check(abs(mod(id,%s))=%s)', i, parts, i);    
  end loop;    
end;    
$$;    

4、插入,采用before触发器,路由到新表

create or replace function ins_tbl() returns trigger as $$    
declare    
begin    
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
    else    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  return null;    
end;    
$$ language plpgsql strict;    
  
  
create trigger tg1 before insert on old for each row execute procedure ins_tbl();    

5、更新,采用before触发器,删除老表,同时将更新后的数据插入新表

create or replace function upd_tbl () returns trigger as $$  
declare  
begin  
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
    else    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  
  delete from only old where id=NEW.id;  
  return null;    
end;    
$$ language plpgsql strict;    
  
create trigger tg2 before update on old for each row execute procedure upd_tbl();    

6、old table 如下

postgres=# \dt+ old  
                    List of relations  
 Schema | Name | Type  |  Owner   |  Size  | Description   
--------+------+-------+----------+--------+-------------  
 public | old  | table | postgres | 730 MB |   
(1 row)  
  
  
继承关系如下  
  
  
postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           | not null |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Indexes:  
    "old_pkey" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 BEFORE INSERT ON old FOR EACH ROW EXECUTE PROCEDURE ins_tbl()  
    tg2 BEFORE UPDATE ON old FOR EACH ROW EXECUTE PROCEDURE upd_tbl()  
Child tables: old_mid0,  
              old_mid1,  
              old_mid2,  
              old_mid3  

7、验证insert, update, delete, select完全符合要求。对业务SQL请求透明。

postgres=# insert into old values (0,'test',now());  
INSERT 0 0  
  
postgres=# select tableoid::regclass,* from old where id=1;  
 tableoid | id |               info               |         crt_time            
----------+----+----------------------------------+---------------------------  
 old      |  1 | 22be06200f2a967104872f6f173fd038 | 31-JAN-19 12:52:25.887242  
(1 row)  
  
postgres=# select tableoid::regclass,* from old where id=0;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | test | 31-JAN-19 13:02:35.859899  
(1 row)  
postgres=# update old set info='abc' where id in (0,2) returning tableoid::regclass,*;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:02:35.859899  
(1 row)  
  
UPDATE 1  
  
postgres=# select tableoid::regclass,* from old where id in (0,2);  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:12:03.343559  
 old_mid2 |  2 | abc  | 31-JAN-19 13:11:04.763652  
(2 rows)  
postgres=# delete from old where id=3;  
DELETE 1  
postgres=# select tableoid::regclass,* from old where id=3;  
 tableoid | id | info | crt_time   
----------+----+------+----------  
(0 rows)  

8、开启压测,后台对原表数据进行迁移

create or replace function test_ins(int) returns void as $$  
declare  
begin  
  insert into old values ($1,'test',now());  
  exception when others then  
  return;  
end;  
$$ language plpgsql strict;  
vi test.sql  
  
\set id1 random(10000001,200000000)  
\set id2 random(1,5000000)  
\set id3 random(5000001,10000000)  
delete from old where id=:id2;  
update old set info=md5(random()::text),crt_time=now() where id=:id3;  
select test_ins(:id1);  

开启压测

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 4 -j 4 -T 1200  
  
...  
  
progress: 323.0 s, 12333.1 tps, lat 0.324 ms stddev 0.036  
progress: 324.0 s, 11612.9 tps, lat 0.344 ms stddev 0.203  
progress: 325.0 s, 12546.0 tps, lat 0.319 ms stddev 0.061  
progress: 326.0 s, 12728.7 tps, lat 0.314 ms stddev 0.038  
progress: 327.0 s, 12536.9 tps, lat 0.319 ms stddev 0.040  
progress: 328.0 s, 12534.1 tps, lat 0.319 ms stddev 0.042  
progress: 329.0 s, 12228.1 tps, lat 0.327 ms stddev 0.047  
...  

9、在线迁移数据

批量迁移,每一批迁移N条。调用以下SQL

with a as (  
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
  
INSERT 0 0  
  
postgres=# select count(*) from only old;  
  count    
---------  
 9998998  
(1 row)  
  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
postgres=# with a as (                     
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
  
postgres=# select count(*) from only old;  
  count    
---------  
 9997998  
(1 row)  
  
postgres=# with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 100000 for update skip locked) ) returning *  
)  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from only old;  
  count    
---------  
 9897998  
(1 row)  
  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  

一次迁移1万条,分批操作。

with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 10000 for update skip locked) ) returning *  
)  
insert into old select * from a;  

持续调用以上接口,直到当old表已经没有数据,完全迁移到了分区。

select count(*) from only old;  
  
  
 count   
-------  
     0  
(1 row)  

10、切换到分区表

创建分区表如下,分区方法与继承约束一致。

create table new (id int, info text, crt_time timestamp) partition by list (abs(mod(id,4)));    

切换表名,防止雪崩,使用锁超时,由于只涉及表名变更,所以速度非常快。

begin;  
set lock_timeout ='3s';   
alter table old_mid0 no inherit old;   
alter table old_mid1 no inherit old;   
alter table old_mid2 no inherit old;   
alter table old_mid3 no inherit old;   
alter table old rename to old_tmp;  
alter table new rename to old;  
alter table old ATTACH PARTITION old_mid0 for values in (0);    
alter table old ATTACH PARTITION old_mid1 for values in (1);    
alter table old ATTACH PARTITION old_mid2 for values in (2);    
alter table old ATTACH PARTITION old_mid3 for values in (3);    
end;  

切换后的原生分区表如下

postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           |          |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Partition key: LIST (abs(mod(id, 4)))  
Partitions: old_mid0 FOR VALUES IN (0),  
            old_mid1 FOR VALUES IN (1),  
            old_mid2 FOR VALUES IN (2),  
            old_mid3 FOR VALUES IN (3)  

查询测试

postgres=# explain select * from old where id=1;  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..10.04 rows=4 width=44)  
   ->  Index Scan using old_mid0_pkey on old_mid0  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid2_pkey on old_mid2  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid3_pkey on old_mid3  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
(9 rows)  
  
  
  
postgres=# explain select * from old where id=? and abs(mod(id, 4)) = abs(mod(?, 4));  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..2.52 rows=1 width=45)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
         Filter: (mod(id, 4) = 1)  
(4 rows)  

数据

postgres=# select count(*) from old;  
  count     
----------  
 10455894  
(1 row)  

方法3,logical replication

使用逻辑复制的方法,同步到分区表。

简单步骤如下:

snapshot 快照(lsn位点)  
  
全量  
  
增量(逻辑复制,从LSN位置开始解析WAL LOG)  
  
切换表名

其他

hash函数

postgres=# \df *.*hash*  
                                            List of functions  
   Schema   |           Name           | Result data type |          Argument data types          | Type   
------------+--------------------------+------------------+---------------------------------------+------  
 pg_catalog | hash_aclitem             | integer          | aclitem                               | func  
 pg_catalog | hash_aclitem_extended    | bigint           | aclitem, bigint                       | func  
 pg_catalog | hash_array               | integer          | anyarray                              | func  
 pg_catalog | hash_array_extended      | bigint           | anyarray, bigint                      | func  
 pg_catalog | hash_numeric             | integer          | numeric                               | func  
 pg_catalog | hash_numeric_extended    | bigint           | numeric, bigint                       | func  
 pg_catalog | hash_range               | integer          | anyrange                              | func  
 pg_catalog | hash_range_extended      | bigint           | anyrange, bigint                      | func  
 pg_catalog | hashbpchar               | integer          | character                             | func  
 pg_catalog | hashbpcharextended       | bigint           | character, bigint                     | func  
 pg_catalog | hashchar                 | integer          | "char"                                | func  
 pg_catalog | hashcharextended         | bigint           | "char", bigint                        | func  
 pg_catalog | hashenum                 | integer          | anyenum                               | func  
 pg_catalog | hashenumextended         | bigint           | anyenum, bigint                       | func  
 pg_catalog | hashfloat4               | integer          | real                                  | func  
 pg_catalog | hashfloat4extended       | bigint           | real, bigint                          | func  
 pg_catalog | hashfloat8               | integer          | double precision                      | func  
 pg_catalog | hashfloat8extended       | bigint           | double precision, bigint              | func  
 pg_catalog | hashhandler              | index_am_handler | internal                              | func  
 pg_catalog | hashinet                 | integer          | inet                                  | func  
 pg_catalog | hashinetextended         | bigint           | inet, bigint                          | func  
 pg_catalog | hashint2                 | integer          | smallint                              | func  
 pg_catalog | hashint2extended         | bigint           | smallint, bigint                      | func  
 pg_catalog | hashint4                 | integer          | integer                               | func  
 pg_catalog | hashint4extended         | bigint           | integer, bigint                       | func  
 pg_catalog | hashint8                 | integer          | bigint                                | func  
 pg_catalog | hashint8extended         | bigint           | bigint, bigint                        | func  
 pg_catalog | hashmacaddr              | integer          | macaddr                               | func  
 pg_catalog | hashmacaddr8             | integer          | macaddr8                              | func  
 pg_catalog | hashmacaddr8extended     | bigint           | macaddr8, bigint                      | func  
 pg_catalog | hashmacaddrextended      | bigint           | macaddr, bigint                       | func  
 pg_catalog | hashname                 | integer          | name                                  | func  
 pg_catalog | hashnameextended         | bigint           | name, bigint                          | func  
 pg_catalog | hashoid                  | integer          | oid                                   | func  
 pg_catalog | hashoidextended          | bigint           | oid, bigint                           | func  
 pg_catalog | hashoidvector            | integer          | oidvector                             | func  
 pg_catalog | hashoidvectorextended    | bigint           | oidvector, bigint                     | func  
 pg_catalog | hashtext                 | integer          | text                                  | func  
 pg_catalog | hashtextextended         | bigint           | text, bigint                          | func  
 pg_catalog | hashvarlena              | integer          | internal                              | func  
 pg_catalog | hashvarlenaextended      | bigint           | internal, bigint                      | func  
 pg_catalog | interval_hash            | integer          | interval                              | func  
 pg_catalog | interval_hash_extended   | bigint           | interval, bigint                      | func  
 pg_catalog | jsonb_hash               | integer          | jsonb                                 | func  
 pg_catalog | jsonb_hash_extended      | bigint           | jsonb, bigint                         | func  
 pg_catalog | pg_lsn_hash              | integer          | pg_lsn                                | func  
 pg_catalog | pg_lsn_hash_extended     | bigint           | pg_lsn, bigint                        | func  
 pg_catalog | satisfies_hash_partition | boolean          | oid, integer, integer, VARIADIC "any" | func  
 pg_catalog | time_hash                | integer          | time without time zone                | func  
 pg_catalog | time_hash_extended       | bigint           | time without time zone, bigint        | func  
 pg_catalog | timestamp_hash           | integer          | timestamp without time zone           | func  
 pg_catalog | timestamp_hash_extended  | bigint           | timestamp without time zone, bigint   | func  
 pg_catalog | timetz_hash              | integer          | time with time zone                   | func  
 pg_catalog | timetz_hash_extended     | bigint           | time with time zone, bigint           | func  
 pg_catalog | uuid_hash                | integer          | uuid                                  | func  
 pg_catalog | uuid_hash_extended       | bigint           | uuid, bigint                          | func  
(56 rows)  

小结

在线将表转换为分区表,可以使用的方法:

1、转换为pg_pathman分区,直接调用pg_pathman的UDF即可。

2、转换为原生分区,使用继承,异步迁移的方法。割接是短暂锁表。

不支持 insert ino on conflict 语法。

insert into old values (1,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;  

3、逻辑复制的方法,将数据增量迁移到分区表(目标可以是原生分区方法或者是pg_pathman分区方法的新表)。

参考

《PostgreSQL 9.x, 10, 11 hash分区表 用法举例》

《PostgreSQL 触发器 用法详解 1》

《PostgreSQL 触发器 用法详解 2》

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

 

免费领取阿里云RDS PostgreSQL实例、ECS虚拟机

PostgreSQL 普通表在线转换为分区表 - online exchange to partition table

上一篇:Rust语言 模式设计 持续更新中


下一篇:SQL Server SQL性能优化之--pivot行列转换减少扫描计数优化查询语句