PostgreSQL Logical Replication

限制及特性

1、只支持普通表生效,不支持序列、视图、物化视图、外部表、分区表和大对象
2、只支持普通表的DML(INSERT、UPDATE、DELETE)操作,不支持truncate、DDL操作
3、需要同步的表必须设置REPLICA IDENTITY 不能为noting(默认值是default),同时表中必须包含主键,否则delete和update报错
4、一个publisher可以包含一张或多张表,一张表可以有一个或多个publishers
5、一个发布者可以有多个订阅者订阅,一个订阅者也可以同时订阅多个发布者,在同一个数据库下订阅者不能对同一个发布者的表重复订阅(避免数据冲突)
6、逻辑复制不同于流复制,不是严格的主从关系,订阅者端的普通表依然可以进行增删改操作
7、同步表的表结构需要在发布者和订阅者两边保持一致(列的顺序允许不一样,但是列对应的数据类型必须一致)
8、如果订阅者端的数据被误删,想要从发布者重新copy同步表的数据,只能以重建同步表所在的订阅者的方式来实现

环境搭建

配置数据库参数

1、修改postgresql.conf数据库参数文件(修改这些参数需要重启数据库)
   a、发布者端设置
      设置wal_level级别为logical:wal_level = logical
      设置max_wal_senders,此参数值要不小于max_replication_slots的参数值,默认值是10
      设置max_replication_slots,此参数值不少于subscriptions的个数,默认值是10
   b、订阅者端设置
      设置wal_level级别为logical:wal_level = logical
      设置max_logical_replication_workers,此参数值不少于订阅者的个数,默认是4
      设置max_worker_processes,此参数值不少于max_logical_replication_workers值+1 

2、在pg_hba.conf添加白名单(根据真实情况自行限制网段)
   host     all     repuser     0.0.0.0/0     md5

3、创建专门用于逻辑复制的超户(建议使用uuid作为密码)
   create user repuser superuser login password 'repuser1234';

创建发布者

publisher是逻辑复制的起点

--查看pub_db数据库的发布者
pub_db=# \dRp
                  List of publications
 Name | Owner | All tables | Inserts | Updates | Deletes 
------+-------+------------+---------+---------+---------
(0 rows)

--在pub_db数据库上创建名为mypub的发布者
pub_db=# CREATE PUBLICATION mypub;
CREATE PUBLICATION
pub_db=# 
pub_db=# \dRp
                    List of publications
 Name  |  Owner   | All tables | Inserts | Updates | Deletes 
-------+----------+------------+---------+---------+---------
 mypub | postgres | f          | t       | t       | t
(1 row)

--查看mypub发布的详细信息
pub_db=# \dRp+
                  Publication mypub
  Owner   | All tables | Inserts | Updates | Deletes 
----------+------------+---------+---------+---------
 postgres | f          | t       | t       | t
(1 row)

创建订阅者

subscriber是逻辑复制的下游

--查看sub_db数据库下的订阅者
sub_db=# \dRs
        List of subscriptions
 Name | Owner | Enabled | Publication 
------+-------+---------+-------------
(0 rows)

--在sub_db数据库上创建名为mysub的订阅者
sub_db=# CREATE SUBSCRIPTION mysub CONNECTION 'dbname=pub_db host=l-test1 user=repuser password=repuser1234 port=6432' PUBLICATION mypub;
NOTICE:  created replication slot "mysub" on publisher
CREATE SUBSCRIPTION
sub_db=# 
sub_db=# 
sub_db=# \dRs
          List of subscriptions
 Name  |  Owner   | Enabled | Publication 
-------+----------+---------+-------------
 mysub | postgres | t       | {mypub}
(1 row)

--查看订阅者mysub的详细信息
sub_db=# \dRs+
                                                                List of subscriptions
 Name  |  Owner   | Enabled | Publication | Synchronous commit |                                      Conninfo                                       
-------+----------+---------+-------------+--------------------+-------------------------------------------------------------------------------------
 mysub | postgres | t       | {mypub}     | off                | dbname=pub_db host=l-test1 user=repuser password=repuser1234 port=6432
(1 row)

添加需要同步的表

发布者端

--创建表
pub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE
pub_db=# insert into logical_tb1(id,col1) select generate_series(1,2000),'tester';
INSERT 0 2000

--添加到发布者mypub
pub_db=# alter publication mypub add table logical_tb1;
ALTER PUBLICATION

--查看发布者的详细信息
pub_db=# \dRp+ mypub
                  Publication mypub
  Owner   | All tables | Inserts | Updates | Deletes 
----------+------------+---------+---------+---------
 postgres | f          | t       | t       | t
Tables:
    "public.logical_tb1"

订阅者端

--创建相同的表
sub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE

--刷新一下订阅者
sub_db=# ALTER SUBSCRIPTION mysub REFRESH PUBLICATION;
ALTER SUBSCRIPTION

--查询数据是否已经同步过来
sub_db=# select count(*) from logical_tb1;
 count 
-------
  2000
(1 row)

约束条件表

对于没有任何约束条件的普通表实现逻辑同步很简单,直接将表添加到发布者即可,但是有约束条件的表如何实现逻辑同步呢?

条件约束

先给出结论:对于条件符合约束的数据在订阅端不影响同步,不符合条件约束的数据在订阅端会同步报错

--在sub_db上的同步表添加一个check 约束
sub_db=# alter table logical_tb1 add constraint col1_check check(col1 = 'test');
ALTER TABLE
sub_db=# \d logical_tb1
                     Table "public.logical_tb1"
 Column |           Type           | Collation | Nullable | Default 
--------+--------------------------+-----------+----------+---------
 id     | integer                  |           | not null | 
 col1   | character varying(8)     |           |          | 
 col2   | numeric(10,2)            |           |          | 
 col3   | json                     |           |          | 
 col4   | hstore                   |           |          | 
 col5   | timestamp with time zone |           |          | now()
 col6   | integer[]                |           |          | 
 col7   | ltree                    |           |          | 
Indexes:
    "logical_tb1_pkey" PRIMARY KEY, btree (id)
Check constraints:
    "col1_check" CHECK (col1::text = 'test'::text)

--在pub_db上的同步表插入两条测试数据
pub_db=# insert into logical_tb1 values(1,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1
pub_db=# insert into logical_tb1 values(2,'test2','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1

--检查sub_db上同步表的数据同步情况
sub_db=# select * from logical_tb1 ;
 id | col1 |  col2  |    col3    |   col4    |             col5              | col6 | col7 
----+------+--------+------------+-----------+-------------------------------+------+------
  1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  | 
(1 row)

--检查日志发现了报错:
ERROR:  new row for relation "logical_tb1" violates check constraint "col1_check"
DETAIL:  Failing row contains (2, test2, 999.99, {"ja":"1"}, "ha"=>"1", 2018-04-11 15:29:12.395614+08, {9}, null).

--删除sub_db上同步表的约束后,数据继续同步
sub_db=# alter table logical_tb1 drop CONSTRAINT col1_check;
ALTER TABLE
sub_db=# select * from logical_tb1 ;
 id | col1  |  col2  |    col3    |   col4    |             col5              | col6 | col7 
----+-------+--------+------------+-----------+-------------------------------+------+------
  1 | test  | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  | 
  2 | test2 | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:29:12.395614+08 | {9}  | 
(2 rows)

外键约束

先给出结论:如果发布端没有外键约束条件,而订阅端有外键约束条件,同步数据不受订阅端外键约束条件影响

--订阅者端创建非同步表
sub_db=# create table logical_tb2(id int primary key,for_id int);
CREATE TABLE

--订阅者端同步表添加列col8并添加外键约束,引用logical_tb2的主键
sub_db=# alter table logical_tb1 add column col8 int references logical_tb2(id);
ALTER TABLE

--订阅端插入初始化数据
sub_db=# insert into logical_tb2 values(1,1),(2,2),(3,3);
INSERT 0 3

sub_db=# select * from logical_tb2;
 id | for_id 
----+--------
  1 |      1
  2 |      2
  3 |      3
(3 rows)

--发布者端插入违反外键约束的数据(发布者端没有外加约束)
pub_db=# insert into logical_tb1 values(5,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}','beijing.haidian',4);
INSERT 0 1

--观察订阅者端的数据,依然能同步进来,不受外键约束的影响
sub_db=# select * from logical_tb1;
 id | col1 |  col2  |    col3    |   col4    |             col5              | col6 |      col7       | col8 
----+------+--------+------------+-----------+-------------------------------+------+-----------------+------
  1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  |                 |     
  2 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:40:28.794865+08 | {9}  |                 |     
  3 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:39:15.537949+08 | {9}  | beijing.haidian |    1
  4 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:40:28.885829+08 | {9}  | beijing.haidian |    4
  5 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:51:50.167068+08 | {9}  | beijing.haidian |    4
(5 rows)
上一篇:一分钟快速搞懂Redis的慢查询分析


下一篇:Linux源码安装PostGIS