PostgreSQL 变态并行拉取单表的方法 - 按块并行(按行号(ctid)并行) + dblink 异步调用

标签

PostgreSQL , 按块扫描 , 采样 , 并行 , 采样扫描 , 数据同步 , 数据全量同步


背景

数据同步是一个较为常见的需求,为了提高性能,并行是一个非常简单暴力的手段。但是通常只能做到不同的对象之间并行,对于单个对象,比如单表,能否并行呢?

有一种方法是使用HASH并行,例如按某个或某几个字段,按hash值取模,切分为多组数据,每个进程读取一部分,并行将单表取出。但是这种方法有一个弊端,会导致重复的扫描。例子如下:

《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》

还有一种方法与之类似,可以使用某个唯一的自增字段,通过BRIN索引或B-TREE索引分段扫描取出。这种方法的好处是避免了HEAP BLOCKS的重复扫描。但是需要有相关性很好的字段以及索引的支撑,否则会有离散扫描,性能也不见得好。

那么还有其他的方法吗?

因为PostgreSQL的数据是堆存储,所以我们可以按BLOCK扫描,每个线程扫描指定的BLOCK分段。目前PG支持指定行号返回,如果需要指定BLOCK返回需要扩展内核功能。

按行号(最好按BLOCK)集合,并行取单表数据

按行号并行扫描,思路:

1、评估表占用了多少数据块

2、评估每个数据快最多有多少条记录

3、生成每个BATCH取数据的行号集合

4、分配任务,每个任务取各自的行号集合

如果支持按BLOCK扫描,思路如下:

1、评估表占了多少个数据块

2、分配任务,每个任务扫描连续的N个数据块

按行号并行的例子

1、创建一个函数,返回每个BATCH处理的连续行号集合,输入4个参数,分别为目标表占用多少数据块,每个数据块最多包含多少条记录,开启多少个并行,返回第几个并行号的行号集合。

create or replace function split_ctid(  
  v_blocks int,         -- 多少个数据块,可通过元数据得到    
  v_rowsperblock int,   -- 每个块内多少行,必须足够大,不要取平均值,可评估得到。 一定要大于或等于包含最多记录的某个数据块。  
  v_mod int,            -- 拆分成多少个并行,必须小于或等于v_blocks 。     
  v_n int               -- 当前进程读第几个MOD, 从0开始 ,到 v_mod-1   
) returns tid[] as $$  
select array (   
  select ('('||blkid||','||generate_series(1,v_rowsperblock)||')')::tid from  
  generate_series  
  (  
    (v_blocks/v_mod)*v_n  
    ,  
    case   
      when v_n+1=v_mod then greatest(((v_blocks/v_mod)*(v_n+1))-1, v_blocks-1)  
      else ((v_blocks/v_mod)*(v_n+1))-1  
    end  
  ) t(blkid)  
);  
$$ language sql strict immutable;  

使用举例:

例如某个表有999个数据块,每个数据块最多10条记录,开启16个并行,获得第0号并行的行号集合。  
  
postgres=# select array_length(split_ctid(999,10,16,0),1);  
 array_length   
--------------  
          620  
(1 row)  
  
获得第1号并行的行号集合。  
  
postgres=# select array_length(split_ctid(999,10,16,1),1);  
 array_length   
--------------  
          620  
(1 row)  
  
获得第15号并行的行号集合。  
  
postgres=# select array_length(split_ctid(999,10,16,15),1);  
 array_length   
--------------  
          690  
(1 row)  

2、实际的例子

获得一个表占用了多少个数据块

postgres=# select relpages from pg_class where oid='public.a'::regclass;  
 relpages   
----------  
   540541  
(1 row)  

评估每个数据块内有多少记录.

postgres=# select n_live_tup+n_dead_tup from pg_stat_all_tables where relid='public.a'::regclass;  
 ?column?    
-----------  
 100000085  
(1 row)  
  
postgres=# select reltuples from pg_class where oid='public.a'::regclass;  
 reltuples   
-----------  
     1e+08  
(1 row)  

评估如下,建议乘以一个系数,表示一个BLOCK最多可能有多少条记录(注意,这个而仅仅是评估,所以有误差):

select (greatest(a.id,b.id)/relpages)*1.15 from   -- 乘以1.15的放大系数  
(select n_live_tup+n_dead_tup as id from pg_stat_all_tables where relid='public.a'::regclass) a,  
(select reltuples as id from pg_class where oid='public.a'::regclass) as b,  
(select relpages from pg_class where oid='public.a'::regclass) c;  
  
     ?column?       
------------------  
 212.750006382495  
(1 row)  

这种方法,并不能保证百分百完全一致。只是很大概率上可以做到一致。当有很多垃圾时,可能导致膨胀,计算得到的每个块内的记录数会变少,就可能导致数据缺失。

然后,我们就可以使用以上方法来并行查询单张表了。

postgres=# show enable_tidscan ;  
 enable_tidscan   
----------------  
 on  
(1 row)  
  
postgres=#  select count(*) from a where ctid = any (split_ctid(540541, 212, 64, 0));  
  count    
---------  
 1562325  
(1 row)  
  
Time: 1818.409 ms (00:01.818)  

开启64个并行,每个并行的耗时约1.8秒。

如果不使用并行,扫描1亿记录需要耗费7.2秒。

postgres=# select count(*) from a;  
   count     
-----------  
 100000000  
(1 row)  
  
Time: 7271.676 ms (00:07.272)  

我们看一下64个并行加起来的记录数是否准确

do language plpgsql $$  
declare  
  v_sum int :=0;  
  v_tmp int;  
begin  
  for i in 0..63 loop  
    select count(*) into v_tmp from a where ctid = any (split_ctid(540541, 212, 64, i));   
    v_sum := v_sum + v_tmp;  
  end loop;  
  
  raise notice 'sum: %', v_sum;  
end;  
$$;  
  
  
NOTICE:  sum: 100000000  
DO  

准确无误。

但是请特别注意:

这种方法,并不能保证百分百完全一致。只是很大概率上可以做到一致。当有很多垃圾时,可能导致膨胀,计算得到的每个块内的记录数会变少,就可能导致数据缺失。

《PostgreSQL 事务快照功能 - Parallel Export consistent data or Parallel Query use snapshot transaction feature》

并行例子

并行就不用说了,程序端发起。在数据库中使用dblink异步调用可以模拟效果。

create extension dblink;  
  
create or replace function conn(        
  name,   -- dblink名字        
  text    -- 连接串,URL        
) returns void as $$          
declare          
begin          
  perform dblink_connect($1, $2);         
  return;          
exception when others then          
  return;          
end;          
$$ language plpgsql strict;    
create or replace function get_cnt(v_blocks int, v_rowsperblk int, v_mod int) returns setof record as $$    
declare    
begin    
  for i in 0..(v_mod-1) loop           
    perform conn('link'||i,  'hostaddr=127.0.0.1 port=1923 user=postgres dbname=postgres');           
    perform 1 from dblink_get_result('link'||i) as t(cnt int8);          
    perform dblink_send_query('link'||i, format('select count(*) as cnt from a where ctid = any (split_ctid(%s, %s, %s, %s)) ', v_blocks, v_rowsperblk, v_mod, i));          
  end loop;       
  for i in 0..(v_mod-1) loop    
    return query select * from dblink_get_result('link'||i) as t(cnt int8);    
  end loop;    
end;    
$$ language plpgsql strict;   
postgres=# select sum(cnt) from get_cnt(540541, 190, 48) t (cnt int8);  
    sum      
-----------  
 100000000  
(1 row)  
  
Time: 4949.961 ms (00:04.950)  

小结

目前PostgreSQL还没有把指定某个数据块的数据记录全部取出的扫描方法,需要扩展采样接口或者AM节点,才能实现此功能。再此之前,我们可以使用按行号扫描,将一个BLOCK中的记录全部取出来,不过这个方法有一定的弊端,就是我们并不知道一个BLOCK里面到底有多少条记录,写多了浪费性能,写少了又会导致记录获取缺失。所以最好还是在内核层面实现BLOCK级别的扫描,告诉数据库你要扫描哪个BLOCK的数据,直接OFFSET到对应的BLOCK并将这个BLOCK的记录全部取出。

可以借鉴PostgreSQL采样接口来实现按块扫描。

https://www.postgresql.org/docs/devel/static/tablesample-method.html

select * from tbl where ctid between ? and ?;  从某行扫描到某行,支持跨BLOCK。  
  
select * from tbl where block_id between ? and ?;  从某个BLOCK扫描到某个BLOCK  
  
select * from tbl where ctid = any (array(?));  -- 已支持。扫描指定集合  
  
select * from tbl where block_id = any(array(?));  -- 扫描指定BLOCK集合  

本文通过块评估、行评估、以及行扫描的方法,实现了一个比较折中的并行扫描连续BLOCK的方法。

参考

《PostgreSQL VOPS 向量计算 + DBLINK异步并行 - 单实例 10亿 聚合计算跑进2秒》

《PostgreSQL 相似搜索分布式架构设计与实践 - dblink异步调用与多机并行(远程 游标+记录 UDF实例)》

《PostgreSQL dblink异步调用实现 并行hash分片JOIN - 含数据交、并、差 提速案例 - 含dblink VS pg 11 parallel hash join VS pg 11 智能分区JOIN》

《惊天性能!单RDS PostgreSQL实例 支撑 2000亿 - 实时标签透视案例 (含dblink异步并行调用)》

《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》

《PostgreSQL 事务快照功能 - Parallel Export consistent data or Parallel Query use snapshot transaction feature》

https://www.postgresql.org/docs/devel/static/tablesample-method.html

《PostgreSQL 任意列组合条件 行数估算 实践 - 采样估算》

《秒级任意维度分析1TB级大表 - 通过采样估值满足高效TOP N等统计分析需求》

《PostgreSQL Oracle 兼容性 之 - 数据采样与脱敏》

《PostgreSQL 巧妙的数据采样方法》

上一篇:01.源码阅读(Fragment--support-fragment-26.0.0-alpha1)


下一篇:使用代码将github仓库里某个issue同步到CSDN博客上