PostgreSQL 逻辑同步

简单概念:
1. 逻辑复制是基于数据对象的复制标识(通常是主键)复制数据对象及其更改的方法。
2. 物理复制是用准确的块地址以及逐字节的复制方式。
3. 逻辑比物理在数据复制和安全上有更细粒度的控制。
4. 逻辑复制使用发布(publish)和订阅(subscribe)模型(类似于sqlserver的发布订阅)。
5. 其中一个或者更多的订阅者(subscribers)订阅一个发布者(publications)节点上的一个或者更多的发布。
6. 订阅者从它们所订阅的发布者拉取数据并且可能后续重新发布这些数据以允许级联复制或者更复杂的配置。
7. 一个表的逻辑复制通常开始于对发布者服务器上的数据取得一个快照并且将快照拷贝给订阅者。一旦这项工作完成,发布者上的更改将会被实时发送给订阅者,订阅者以相同的顺序应用那些数据,这样在一个订阅中能够保证发布的事务一致性。这种数据复制的方法有时候也被称为事务性复制。
8. 订阅者数据库行为跟任何其他pg实例相同,并且可以被用作其他数据库的发布者,只需要定义它子集的发布。

简单用法:
1. 在一个数据库或者一个数据库的子集中发生更改,把增量的改变发送给订阅者。
2. 在更改到达订阅者时触发触发器。
3. 把多个数据库复制到一个数据库中。
4. 在pg不同主版本之间进行复制
5. 在不同平台(比如说linux到windows)的pg之间进行复制。
6. 将复制数据的访问给予不同的用户组。
7. 再多个数据库之间共享数据库的一个子集。

发布者(publication)

简单概念
1. 发布可以被定义在任何已经搞好物理复制的主服务器上。
2. 定义有发布的节点被称为发布者,发布是从一个表或者一组表生成的改变的集合,或者更改的集合或者是复制的集合。
3. 每个发布都只能存在于一个数据库中。
4. 发布与模式(schema)不同,并不会影响表的访问方式。
5. 如果有需要,每个表都可以被加入到多个发布。
6. 目前版本(12.2)发布只能包含表。对象必须被明确的加入到发布,除非发布是all tables创建的。
7. 发布可以选择把他们产生的更改限制为insert、update、delete以及truncate的任意组合,类似于触发器如何被特定事件类型触发的方式。
8. 默认下,所有的操作类型都将会被复制。
9. 为了能够复制update和delete操作,被发布的表必须配置一个复制标识(通常是主键),这样订阅者那一端才能知道更新或者删除哪些合适的行。
10. 默认情况下,复制标识就是主键(如果有的话)。
11. 也可以在复制标识上设置另一个唯一索引(但是有特定的额外要求)。
12. 如果表没有合适的键,那么复制标识可以设置为full,它表示整个行都成为那个键,不过这样会导致复制效率非常低,只有在没有其他方案的情况下才应该使用。
13. 如果发布者设置了full之外的复制标识,在订阅者端也必须设置一个复制标识,他应该由相同的或者少一些的列组成。
14. 如何设置复制标识参考 http://www.postgres.cn/docs/11/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY
15. 如果在复制update或者delete操作的发布中加入了没有复制标识的表,那么订阅者上后续的update或者delete操作将会导致错误
16. 不管发布订阅有没有复制标识,insert操作都能正常复制。
17. 每一个发布都可以有多个订阅者。
18. 发布者可以通过使用create publication命令 http://www.postgres.cn/docs/11/sql-createpublication.html 来创建删除,有它自己的语句。
19. 表可以使用alter publication动态的增加或移除。新增表 add table 和删除表 drop table都是事务性的,因此发布上该事务提交,该表将以正确的快照方式开始或者停止复制。

订阅者(subscription)

简单概念
1. 订阅是逻辑复制的下游端。一个订阅会定义到另一个数据库的连接,以及它想要订阅的发布集合(一个或多个)
2. 订阅者数据库行为跟其他任何pg实例相同,并且可以被用作其他数据库的发布者,只需要定义它自己的发布。
3. 如果需要,一个订阅者节点可以有多个订阅,可以在一对发布者-订阅者之间定义多个订阅,在这种情况下要确保被订阅的发布对象不会重叠。
4. 每一个订阅都将通过一个复制槽接收更改。预先存在的表数据的初始数据同步过程可能会要求额外的临时复制槽。
5. 逻辑复制订阅端可以是同步复制的后备服务器。后备名称默认是该订阅的名称。可以在订阅的连接信息中用application_name指定一个可供选择的名称。
6. 如果订阅用的用户是一个超级用户,则订阅会被pg_dump转储。否则订阅会被跳过并且写出一个警告,因为非超级用户不能从pg_subscription目录中读取所有的订阅信息。
7. 可以使用create subscription http://www.postgres.cn/docs/11/sql-createsubscription.html 增加订阅,并且使用alter subscription http://www.postgres.cn/docs/11/sql-altersubscription.html 在任何时刻停止/继续订阅,还可以使用drop subscription http://www.postgres.cn/docs/11/sql-dropsubscription.html 删除订阅。
8. 在一个订阅被删除并且重建时,同步信息会丢失,这意味着数据必须被重新同步。
9. 模式schema的定义不会被复制,并且被发布的表必须在订阅者上存在。只有常规表可以成为复制的目标。例如,不能复制视图。
10. 表的列通过名称匹配。允许在目标表中的列序不同,但是列类型必须匹配。目标表可以有被发布表没有提供的额外列。额外列将用默认值填充。

复制槽管理

每一个活跃的订阅都会从发布端上的一个复制槽接收更改,通常发布端的复制槽是订阅端用create subscription创建订阅时自动创建的,并且使用drop subscription删除订阅时,复制槽也会自动被删除。不过有些情况下,需要单独操作订阅及其复制槽。下面就是场景:
1. 在订阅端创建一个订阅的时候,发布端上的复制槽已经存在。在这种情况下,可以使用create_slot = false选项创建并关联到已有的复制槽上。
2. 创建一个订阅时,发布端机器网络不通信,或者发布端机器处于一种不明的状态。在这种情况下,可以使用connect = false选项创建订阅,加上此参数发布端将根本不会被联系。pg_dump也是使用的这种方式。这样在订阅可以被激活之前,你必须手工在发布端上创建相关的复制槽。
3. 在删除一个订阅时,复制槽应该被保留。因为当订阅者数据库正在被迁移到一台不同的服务器上的时候,并且将在新机器上被激活,这样保留复制槽的行为就很有用。在这种情况下可以尝试删除该订阅之前,使用alter subscription将复制槽解除关联。
4. 在删除一个订阅时,发布端无法连接访问,这种情况下,可以尝试删除该订阅之前,使用alter subscription将复制槽关联解除。如果远程数据库实例不再存在,那么不需要进一步操作。不过如果远程主机只是无法连接,那么复制槽应该手工到远程主机(发布端)上删除。否则它将会继续保留wal日志并且最终可能会导致磁盘被装满。

冲突

逻辑复制的行为类似于正常的DML操作,即使数据在订阅者节点本地被修改,逻辑复制也会根据收到的更改来更新数据。如果流入的数据违背了订阅节点的任何约束,复制将会停止,这种情况被称为一个冲突。
1. 在复制update或者delete操作时,缺失的数据将不会产生冲突,这类操作将被简单的跳过。
2. 冲突将会产生错误并且停止复制,必须有用户手工解决。在订阅者服务器日志中可以找到有关冲突的详细情况。
3. 通过更改订阅者上的数据(这样它就不会与到来的数据发生冲突)或者跳过与已有数据冲突的事务可以解决这种冲突。通过调用pg_replication_origin_advance() http://www.postgres.cn/docs/11/functions-admin.html#PG-REPLICATION-ORIGIN-ADVANCE 函数可以跳过该事务,函数的参数是对应于该订阅名称的node_name以及一个位置。
4. 复制源头的当前位置可以在pg_replication_origin_status http://www.postgres.cn/docs/11/view-pg-replication-origin-status.html 系统视图中看到

限制

逻辑复制自然也有一些不能复制的东西。
1. 数据库schema和ddl命令不会被复制。初始schema可以用手工pg_dump --schema-only进行拷贝。后续的schema改变需要手工保持同步。不过其实schema不用在两端保持绝对相同。当一个活跃的数据库中schema定义改变时,逻辑复制是可靠的;当schema在发布者上发生改变并且被复制的数据开始到达订阅者但却不适合订阅端的表schema时,复制将报错,直到订阅端的schema被手工进行更新。在很多情况下,可以通过先对订阅者进行schema的更新来避免这种错误。
2. 序列数据不会被复制。后台由序列支撑的serial或者标识列中的数据会被复制为表的一部分,但是序列本身在订阅者上仍将显示开始的值。如果订阅者被用作一个只读数据库,那这同城不会是什么问题。不过如果订阅者数据库预期有某种关于序列的转换或者容错啥的,那么订阅端的序列需要被更新到最后的值,要么通过从发布者拷贝当前数据到订阅端(使用pg_dump),要么从表本身就指定一个足够高的值。
3. 支持truncate命令的复制,但是在截断由外键连接在一起的表群体时必须要注意。在复制截断动作时,订阅者将截断与发布者被截断的相同的表群体,这些表被明确的指定,或者通过cascade隐含的收集而来,然后还要减去不属于该订阅的表。如果所有受影响的表都属于同一个订阅,这会正常进行。如果订阅要截断的某些表有外键链接到不属于同一订阅的表,那么在订阅者上该截断动作将会失败。
4. 大对象(large objects https://www.postgresql.org/docs/12/largeobjects.html )不会被复制。pg没有办法解决这个问题。除非把数据存储到普通表中。
5. 复制只能从基表到基表,也就是说发布端和订阅端的表必须是普通表,而不是试图、物化视图、分区根表或者外部表。如果是分区,可以一一对应的复制分区层次,但当前不能复制成一种不同的分区设置。
6. 尝试复制不是基表的表将会导致报错。

架构

1. 逻辑复制从拷贝发布者数据库上的数据库快照开始。拷贝一旦完成,发布者上的更改会在它们发生时,实时传送给订阅者。订阅者按照数据在发布者上被提交的顺序应用数据,这样任意单一订阅中的事务一致性才能得到保证。
2. 逻辑复制被构建在一种类似于物理流复制的架构上,它由walsender和apply进程实现。walsender进程开始对wal进行逻辑解码(如果你要审计逻辑解码出来的数据,将所执行的修改通过sql以流的方式传输给外部使用者,比如说输出到某个表中,你可以用逻辑解码功能,写个函数实现。https://www.postgresql.org/docs/12/logicaldecoding.html)并且载入标准逻辑解码插件。该插件把wal中读取的更改转换成逻辑复制协议(说是协议,其实就是以PG复制命令起头的消息流,这种逻辑流复制协议建立在物理流复制协议的原始积累之上 https://www.postgresql.org/docs/12/protocol-logical-replication.html ),并且根据发布说明过滤数据。然后数据会被连续的使用流复制协议传输到应用工作者(apply进程),应用工作者会把数据映射到本地表并且以正确的事务顺序应用它们接收到的更改。
3. 订阅者数据库上的应用进程总是将session_replication_role设置为replica运行,这会产生触发器和约束上一样的效果。
4. 逻辑复制应用进程当前仅会引发行触发器,不会引发语句触发器。不过初始的表同步是以类似一个copy命令的方式实现的,因此会引发insert的行触发器和语句触发器。

首次同步

1. 已有的被订阅表中的初始数据会被快照并且以一种特殊类型的应用进程(The worker进程)的并行实例进行拷贝。这种进程将创建自己的临时复制槽并且拷贝现有的数据。一旦现有的数据被拷贝完,the worker 进程 会进入到同步模式,该模式会流式传递在使用标准逻辑复制拷贝初始数据期间发生的的任意改变,这会确保表被带到一种已同步的状态。一旦同步完成,该表的复制的控制权将会被交给主应用进程(apply),其中复制会照常继续。

监控

发布端:
1. 因为逻辑复制是基于与物理流复制相似的架构,一个发布节点的监控也类似于对物理复制主节点的监控

订阅端:
1. 有关订阅的监控信息在pg_stat_subscription http://www.postgres.cn/docs/11/monitoring-stats.html#PG-STAT-SUBSCRIPTION 中可以看到,每一个订阅工作者在这个视图都有一行。一个订阅能有0个或者多个活跃的订阅工作者,取决于它的状态。
2. 通常,对于一个已启用的订阅会有单一的应用进程运行。一个被禁用的订阅或者崩溃的订阅在这个视图中不会有行存在。如果有任何表的数据同步正在进行,对正在被同步的表会有额外的工作者。

安全性

1. 用于复制连接的角色必须有replication属性(或者是一个超级用户),该角色的访问必须被配置在pg_hba.conf中,而且该用户必须有login属性。
2. 为了能够拷贝初始表数据,用于复制连接的角色必须在被发布的表上具有select权限(或者是一个超级用户)
3. 要创建发布,用户在数据库,必须有create权限
4. 要把表加入到一个发布,用户必须有该表的拥有权。
5. 要创建一个自动发布所有表的发布,用户必须是一个超级用户。
6. 要创建订阅,用户必须是一个超级用户。
7. 订阅的应用过程将在本地数据库上以超级用户的权限运行。
8. 权限检查仅在复制连接开始时被执行一次。从发布者督导每一个更改记录时不会重新检查权限,在每一个更改被应用时也不会重新检查权限。

配置设置

逻辑复制要求设置一些配置选项。
发布端:
1. 发布端上,wal_level必须被设置为logical,而max_replication_slots中设置的值必须至少是预期要连接的订阅数加上保留给表同步的连接数。猜测1个发布者,发布了10个表,有10个订阅者,则该值至少是20。
(max_replication_slots
指定服务器可以支持的最大复制槽数。默认是10。修改该参数需重启服务器。将其设置为小于当前现有复制槽数的值将阻止复制插槽服务启动。另外wal_level必须将其设置为replica或更高才能使用复制插槽。)
2. max_wal_senders应该至少被设置为max_replication_slots加上同时连接的物理副本(物理同步)的数量。在1的情况下,同时还有5个物理备库,则该值为20+5=25;
订阅端:
1. 设置max_replication_slots参数,必须至少被设置为将加入到该订阅者的订阅数。猜测是级联情况下,自己身为订阅者的同时也作为1个发布者,有10个订阅者,那该参数至少被设置为10;如果不是级联,则该参数默认10就可以,实际上可能1个也用不到,因为自己不用创建复制槽。
2. max_logical_replication_workers必须至少被设置为订阅数加上保留给表同步的连接数。猜测级联情况下至少被设置为,10订阅者+10个表=20;如果不是级联,则为1(自己就是1个订阅者)+10(自己订阅了发布者的10个表)=11;
3. 可能需要调整max_worker_processes以容纳复制工作者(the worker 进程),至少为max_logical_replicaton_workers+1 为20+1=21;如果不是级联,则为11+1=12;
无论发布和订阅:
一些pg的扩展插件和并行查询也会从max_worker_processes中获取the worker 进程,所以该参数尽量设置大一点。

实操

环境:
1个发布者(192.168.31.123)
1个订阅者(192.168.31.124),该订阅者不是123机器的物理备机。

发布端:
1. 修改wal_level从replica修改为logical修改后要重启
2. 修改max_replication_slots为20(10个订阅端,同步10个表)
3. 修改max_wal_senders为25(主服务器作为发布者的同时,又是5个物理备机的主机)
4. 修改pg_hba.conf,新增replication的一行,这里用超级用户postgres

或

5. 创建数据
例1:test01_p(有主键)
在gaodb数据库创建schema

在schema logical中创建表,其中id列设置为主键,作为复制标识。

查看该表


例2:test02(无主键)
在schema logical中创建表test02,此时test02表无复制标识

因为test02没有复制标识,所以按照上面的知识,必须给他打上full(记录表中所有行所有列的数据变化)复制标识,才可以被逻辑复制,但是复制效率比较差。

如何打复制标识
https://www.postgresql.org/docs/12/sql-altertable.html

例3:分区表measurement



在逻辑复制中分区表只能当做普通表来复制,父表算一个普通表,子表也算普通表。那普通表就需要有复制标识,这里创建一个主键索引。

给声明性分区的父表增加主键索引,所有的子表都会自动新增主键索引。所以分区表的复制标识就都有了。

6. 查看发布端复制槽

可以看到有4个复制槽,其中234是在使用的,用作物理备库。1已经没有在使用了。可是1的复制槽类型是物理复制槽,是不能用作逻辑复制用。所以要把1删掉。不删掉也可以,但我这里用到了这个复制槽的名字。
删除:

7. 创建发布
如果你把分区表的父表写到发布里是会报错的。pg中实际的数据是存放于子表中的,所以我们只复制子分区的数据,把子分区当做普通表来复制。





8. 对发布端进行pg_dump导出schema定义

-s 只转储gaodb下的所有对象定义(schema啥的,表结构啥的),而非数据。
-v 显示导出信息
此pg_dump会导出gaodb下所有schema下的所有的对象的结构,但是不导出具体的表数据。

9. 此时发布端要干的活就干完了。测试用的三个表都放在数据库gaodb下的logical schema下。
修改发布定义:
http://www.postgres.cn/docs/11/sql-alterpublication.html  比如说在发布中add新表或者是drop表
删除发布定义:
http://www.postgres.cn/docs/11/sql-droppublication.html


订阅端:
1. 修改max_replication_slots为10(未来该订阅端会作为发布端,下属10个订阅端,形成级联的场景,所以设置为10)
2. 修改max_logical_replication_workers为20(10个下级订阅者,10个上级要同步的表)
3. 修改max_worker_processes为21
注意,我这个124的订阅端,不是123的物理备库,只是把123作为逻辑复制的订阅端。所以123上没有物理备库的参数比如说:

4. 导入发布端pg_dump出来的东西
发布端的测试数据存放在gaodb下logical下
订阅端如果没有gaodb存在则需要创建,如果像本例一样,已经存在gaodb了,而且gaodb中已经有其他表存在并且在使用了。就不用创建gaodb了。
此时订阅端只有gaodb存在,logical schema不存在。

先传输备份,再开始导入

执行成功后,124的gaodb下就有了所有的schema和其对象结构。

5. 创建订阅

发布端可以创建非常多的发布,比如a数据库的a类型的表,叫做a发布;b数据库的b类型的表,叫做b发布。
那订阅端就可以选择性的订阅,多个发布端就在publication standby01后面加逗号分隔发布名。比如说 publication standby01,standby02,standby03;
6. 此时查看124的log

可以看到,每个表初次同步的时候都会启动一个the worker 进程。跟上面的描述是匹配的。

验证

基础数据
查看124的表,看看初始数据过来没,此时分区表还没有数据。


DML
123的3种类型的表进行dml操作。
发布端





订阅端


Truncate
发布端

订阅端

可以看到,订阅端是可以接收到主库正常的dml和truncate操作的。

表结构不同(有默认值)
1. 先删除订阅
2. truncate所有同步的表,或者只删除有主键索引和唯一索引的列。否则the worker进程重新给你复制过来,插入重复值,会触发主键约束,它不会跳过已存在的行。
3. 更改目标表test02的定义,打乱列的顺序,新增1列;
4. 订阅端



创建好test02表结构后,创建订阅

看日志,首次同步完成。

5. 发布端

可以看到两边表的列顺序和列数量都不一致了。
6. 验证同步
发布端插入数据

订阅端:

有了数据

表结构不同(无默认值)
1. 订阅端
删除订阅

truncate所有同步的表;



2. 发布端
插入新行

3. 订阅端
查看订阅端数据是否有了,如下图已经有了。

4. 结论
订阅端多余的行,有默认值就按默认值,无默认值就设置为null。
5. 订阅端*修改
你可以在订阅端给这列单独插入值。
订阅端


6. 验证同步
此时发布端插入新行,看同步是否正常
发布

订阅


注意
1.修改订阅定义:
http://www.postgres.cn/docs/11/sql-altersubscription.html
2.删除订阅:
http://www.postgres.cn/docs/11/sql-dropsubscription.html

3.分区表是不能在订阅端插入到不同分区表中的。

监控

发布端
通过pg_stat_replication来监控

订阅端
通过pg_stat_subscription来查看。


监控同步进度
需要通过函数 获取当前wal日志内容,存到自定义的日志表中

创建表
CREATE TABLE IF NOT EXISTS  tab_log_slot(lsn pg_lsn,xid xid,data text);

创建函数,获取发布端复制槽的数据改变
--Getting a logical decoding value by f_get_slotinfo
CREATE OR REPLACE FUNCTION f_get_slotinfo(slotname varchar)
RETURNS void AS 
$FUNCTION$
 DECLARE
  v_slot_cursor  REFCURSOR;
  v_slot_record  RECORD;
  v_sql_text   text;
 BEGIN
  OPEN v_slot_cursor FOR SELECT * FROM  pg_logical_slot_get_changes(slotname, NULL, NULL); 
  LOOP
   FETCH v_slot_cursor INTO v_slot_record;
   IF v_slot_record.lsn IS NOT NULL THEN
    v_sql_text := 'INSERT INTO tab_log_slot(data) VALUES('||chr(39)||replace(v_slot_record.data,chr(39),'')||chr(39)||');';
    EXECUTE v_sql_text;
   ELSE
    EXIT;
   END IF;
   EXIT WHEN NOT FOUND;
  END LOOP;
  CLOSE v_slot_cursor;
 END;
$FUNCTION$

LANGUAGE PLPGSQL;PostgreSQL 逻辑同步

上一篇:解决多点双向路由重发布产生的问题(路由修剪)


下一篇:MySQL常用30种SQL查询语句优化方法