DTS 及其在PG 数据库生态中的应用

一、PostgreSQL 数据传输通道关键技术

(一)什么是数据传输通道?

数据通道并不是一个很新的概念,很早以前就已经有人或者企业提出了。从数据库的视角来看,数据通道是要去解决各个数据库、数据平台还有信息系统之间的数据连通性的题,然后来支持数据高速*的流动。


数据通道建立之后,可以从TP 数据库流转到AP 数据库进行分析;或者流转到大数据平台,进行实时计算;或流转到kv 上去做的cache;或从AP 数据库再将分析结果流回到TP 数据库,以便于支撑业务对统计信息的查询。


在一条数据通道上,流动的数据类型大概有两种:

第一种,存量数据,存量数据不是实时产生的,比如说一些基础的信息数据。

第二种,增量数据, 增量数据是实时产生的数据。

DTS 及其在PG 数据库生态中的应用

从另一个维度去划分,数据通道上的数据会分成真实的数据和对数据描述的数据。拿数据库来讲,表结构或者说DDL 信息,以及这张表对应的真实数据,作为数据传输通道,它就像我们的路一样,越快越好,“快”就是数据传输通道的传输效率问题。


解决了在数据通道上的传输,传输效率之后,我们在数据通道上也应该提供一些ETL的能力,ETL 的能力可以做数据的清洗(比如非法的数据,把它清洗成合法的数据,写入到这个目标端去)。或者对一些敏感的数据进行脱敏,再放到大数据的系统里面进行分析。或者若干张表的数据,在数据通道内部进行多表合并,然后再把它放到AP 的库里面分析。


做为一条数据通道,在数据质量产生了问题或者数据的链路出现了拥塞的时候,一定要有较强的数据保障能力。


(二)PostgreSQL 内置数据类型

PG 内置的数据类型:

数据通道要解决数据的传输问题,那么就一定要有具体数据。


对于不同的DB 库来讲会有不同的类型, PG 提供了几种基础类型:数值型、字符型、二进制型、时间型、贝尔型、比特型等。


PG 除此之外还提供了增强的类型,比如地理位置、信息类型、网络型等。


系统ID 类型——tid ,tid 对PG 来讲,是PG 的内置字段。在使用上,往往会对ctid 的错误使用,通过PK 或UK 或索引字段,查出来ctid,紧接着使用ctid 对表进行操作。这个使用方式其实是不太靠谱的,因为ctid 它代表的是这一行数据所属的页以及在页内的偏移。PG 是通过copy of direct 实现的macc 机制,在查询出来之后,这行数据的ctid 可能发生变化,如果我们固化了使用ctid,就会产生数据不一致的问题。


在大部分的时候,因为我们是通过PG 的driver 实现的数据的读取,所以不需要关注各种数据类型内部的存储情况。如果要去看,比如说对Wal 的日志进行细腻度的分析,或者对一页的数据进行细腻度的分析,可能就会需要了解每一个不同的类型在PG 内部的存储的格式。

DTS 及其在PG 数据库生态中的应用

(三)PostgreSQL 分区表

在读取PG 数据的时候,数据通道里面有两种不同的数据的类型,一种是存量数据,一种是增量数据。


在PG 的数据库里面,对于存量数据的读取分区表是一个非常独特的case,分区表从PG10 开始,有两个截然不同的表现,在PG10 以前,分区表是基于PG 的CHECK-IN HERIT 语法来实现,同时需要手动的在主表上创建trigger,这种方式实现比较脆弱。


物理表和逻辑表其实没有明确的区分。


从PG10 之后,我们可以看得到PG 引入了partition by range 这样的支持,支持

分区表的语法,在此之后PG 的原数据库部分能够明确的区分出来,哪张表是主表,哪张表是物理表。


比如下图的例子,可以看得到logical table 它的relation kind 是p 它代表的是分区表,也就是说主表; physical table 1 它的relation kind 是r,代表的是物理表,就是一张正常的、普通的表。

DTS 及其在PG 数据库生态中的应用

为什么要区分主表和物理表?


因为在读取全量数据的时候,对于一张分区表,我们希望读取它物理表的数据,这样会读的快,而不去读取主表的数据。


如果不能区分物理表和主表,那就既读取了主表的数据还有物理表的数据,这个数据就会有大量的重复,相当于我们读取了两次。


(四)PostgreSQL 增量数据获取

怎样获取到增量的数据?

PG 的增量获取方式有三个大的版本:

1. 在PG 的9.3 之前,它是不支持Stream Replictaion 的,一般常用的做法都是基于trigger 的方式进行增量的获取。当然也可以进行业务的侵入,比如说使用业务的

GMT modify,要求业务有GMT modify 的字段,通过这个字段,我们可以拿出某一

个时间点之后的数据,这个也是一个比较间接的来获取增量数据的方式。


2. 从PG9.3 之后,PG 支持了叫做stream replication 这样的方式,基于此我们就可以通过这种方式来获取到增量数据。获取增量数据有两种不同的类型,一种叫做logical slot;还有一种是physical slot。在业内目前都是居于logical slot 的方式进行获取。在PG9.3 之后,在PG10 之前, logical slot 的decoding 是没有一个能够正式应用于生产环境的decoding 的,所以我们提供出一个叫做ali decoding 的基于让logical slot 使用的解码器。通过它我们可以把PG 的wal 数据,解码成string类型的逻辑数据,再给到增量获取的模块。


3. 在PG10 之后提供了PGoutput slot 的解码器,通过它可以实现ali decoding 相似的功能,在PG10 之后提供了一个叫做订阅的概念,能够从下图最右边的语句上看出来,在最右边的语句上展示了在PG10 或10 以后是怎么样去创建一个逻辑订阅的。

DTS 及其在PG 数据库生态中的应用

首先会通过叫做PG_create_logical_reputation_slot 的方式来创建出逻辑slot,然后会创建出来一个订阅,这个订阅能够指定订阅哪张表,或是订阅所有的表给逻辑slot,这里的“%s”是需要填充的上个阶段创建的逻辑slot 的名称。


对于较少的表,可以指定具体的表名字,如果表很多,就可以写成FOR ALL TABLES,订阅所有表的增量数据,随后使用start replication 开启读取订阅数据。


在这里需要注意的是PG 的REPLICA IDENTIFY 是表级别的,在这个表级它有不同的级别,比如说default 是默认的级别; using index 是PK 或者是UK;FULL 是所有的; nothing 是什么都没有。


从PG10 开始支持逻辑订阅,在逻辑订阅中有一个限制,对无主键、无唯一键的表,如果针对这张表开启逻辑订阅,要求这个表的REPLICA IDENTIFY 是负才可以。否则的话,这张表是不能够进行delete 和update 操作的,关于这点要特别注意。


(五)PostgreSQL Stream Replication

PG 的stream,application 内部的实现原理


PG backend 是PG 的一个服务,接受用户的TP 数据,形成WAL 写入到WAL 日志里面去,并且通知walsender 读取wal 的数据,walsender 根据接收到的signal 的通知之后,读取wal 的数据。walsender 根据当前slot 注册情况,如果logical_slot,那就读取一个完整的Excel record,然后通过logicaldecode 的方式进行解码,获取到解码后的数据。


如果是physical_slot,那就是通过physical_decode 进行简单的xlog 的数据读取,然后获取到待发送的数据之后,最后由Walsender 将待发送的数据发送给增量的订阅客户端。


从这儿可以看得到,无论是logical 的还是physical 的,其实数据都是wal 的日志,只不过logical 的数据它要求是一个完整的xlog 的记录,而physical 的只要是一个完整的xlog 数据块就可以了。


在这里面的关键点, logical slot 它是通过logical_decode 这样的一个框架进行解码,框架里面会调用具体的output_plugin,进行整行数据的输出与格式的转换。


Logical_decode 框架,对于每行数据是要求知道这行数据的relation 信息,要求PG 知道信息的具体状态。关于这一块如果logical slot 是以历史的点位进行数据拉取,PG 这边是没办法保证一定能够找得到当时点位所对应的表的结构信息。


所以这就是为什么PG 的logical slot 在创建的时候,只能以当前的点位进行创建,而不能以历史的点位。一旦PG 的logical slot 创建出来之后, PG 的backend 会根据各个logical slot 里历史最早的点位进行数据保存。通过这种方式能够保证所有的logical slot在需要matter 信息的时候,PG 这边都能够得到,都没有被释放掉。

DTS 及其在PG 数据库生态中的应用

(六)PostgreSQL 数据导入

如何让链路的效率更快?

PG 有两种数据的写入方式的:

第一种是batch lnsert,是把一些数据整合成一条sql,进行插入;

第二种方式就是PG copy,经过测试, PG copy 的性能要远超于batch lnsert,大概是batch lnsert 的4 倍左右,是PG 线路里面一个重要的提升传输效率的一种方式。

DTS 及其在PG 数据库生态中的应用

二、DTS PostgreSQL 数据传输通道实现

理解了PG 数据库对数据通道能够提供的基础能力后,下面以DTS 数据传输服务为例来看一下,怎么样基于这些基础能力进行整合,打造出来一条数据通道。

(一)DTS 是什么——异地多活的数据通道

DTS 是阿里云的服务,中文名称是数据传输服务,DTS 是数据通道的具体的实现。DTS 的一个重要的属性是异地多活的数据通道。


什么叫异地多活呢?

比如,解决杭州到北京之间数据库之间的数据传输;多活是杭州和北京之间的两边都支持数据的写入。DTS 在阿里巴巴集团内部已经是重要的数据数据传输通道,支持了阿里巴巴历年的双11。


看下图,在2019 年双11 的时候,整个的TP 的数据洪峰大概是54.4 万米每秒,在DTS 异地多活的数据通道的强力保障之下,我们做到了毫秒记的延迟,DTS 同时也是阿里云内部、阿里云上客户数据同步的重要的利器。


从下图可以看出,DTS 支撑了全球的数据同步。

DTS 及其在PG 数据库生态中的应用

(二)DTS 是什么——用户上云的高速公路

DTS 是云下的用户和其他云厂商的用户上阿里云的高速通道。


云下的用户可以很方便地使用dts 的服务,将自己云下的数据搬到云上的数据库中,享受不停机数据迁移的能力,并且能够基于DTS 的双向同步能力,在发现云上数据库出现问题时,能够快速将自己的业务切换到本地。


DTS 支持云下的专线连接,支持公网连接,支持VPN 连接等多种连接方式。


可以让云下的用户,很方便的享受到云上的数据库能力。比如,云上强大的IP 数据库的分析能力,大数据的计算能力。


DTS 及其在PG 数据库生态中的应用

(三)DTS 是什么——高价值数据的分发源头

DTS 支持各种TP 数据库的增量数据。


DTS 在获取到这些增量数据之后,可以把这些附加值很高的数据分发到,比如ES、Redis、Kafka、Flinkt 等这种大数据平台,进行各种数据计算,将数据的价值最大化。


DTS 及其在PG 数据库生态中的应用

(四)DTS 逻辑架构概览

DTS 是云上的分布式数据传输通道,在源端的DTS 支持各种TP 数据库、Redis、SQL 数据库,以及分布式数据库,在目标端DTS 可以把这种数据写入到TP 数据库,mango 或者说大数据平台、Kafka、订阅客户端去。


DTS 自身给用户提供了两个基础接口,用户可以通过控制台操作DTS 相应的任务,也可以通过这种open API 进行批量化的创建与管理。


DTS 是自身的分成预检查模块、结构迁移模块、全量迁移模块、日志解析模块以及数据写入模块。DTS 通过这个数据校验,提供数据质量的保证能力。


DTS 的数据都存储在DStore 中,我们可以通过DTS 的ETL 模块,对DStore 中的数据进行数据清洗、数据转换,并且最终由writer 同步到目标端,或者由数据订阅的客户端消费。


在原端的DTS 支持两种数据,一种是存量数据,包括数据还有表结构;另一种是增量日志,这一部分可能包括具体的数据以及DDL。


DTS 有三大功能:

第一个就是数据迁移,数据迁移这一块主要用来解决用户的数据库迁移;

第二个是数据同步,数据同步主要的场景是异地多活的场景;

第三个是数据订阅,数据订阅是将增量的数据交给客户端,进行大数据分析,或者用户自定义的业务。


DTS 提供Numan 运维平台,在运维平台之上可以完成任务的告警监控,可以完成用户的任务管理、任务告警、资源管理以及对源库和目标端的异常情况监控。


DTS 的分布式能力主要体现在它的调度服务。DTS 将资源池化之后,对各个用户的链路提供了HA 能力,保证了99.99%的服务能力。


DTS 及其在PG 数据库生态中的应用


(五)DTS 高效读写PostgreSQL

DTS 如何提升存量数据的传输效率?


除了使用前面提到的PG 自身的PG copy 之外,我们对全量采用了表兼并发的能力。首先会对一张PG 表进行切片,将其切分成若干个小片段,针对每个片段使用并发读取、并发写入的方式,将存量数据高效的同步到目标表。


右边是DTS 针对PG 增量数据进行的高效写入。对于增量数据,DTS 会将其拆分成原始的一个事物,对于没有冲突的事物, DTS 会采用并发写入的方式。对于有冲突的事物,DTS 会采用串行写的方式。通过这种方式在提升效率的同时,也保证了数据的最终一致性。


关于这一块的增量和全量的并发写入DTS 是有相应的专利的。


还有一部分数据就是针对一张表的某一个字段的频繁的热点的更新,这种情况之下因为都是冲突的数据,没办法进行并发写入,DTS 是采用热点合并的方式进行解决。


DTS 及其在PG 数据库生态中的应用


(六)DTS 捕获PostgreSQL 增量数据

DTS 有着非常丰富的手段获取到PG 增量数据。


第一个就是dml trigger,针对PG 9.3 以前的版本, DTS 可以通过在原库创建trigger 的方式,拉取到增量数据。针对9.3 之后的版本, GTS 是通过logical slot 的方式来获取到dml 数据。


由于PG 自身是不支持DDL 的原始语句写入到wal 中的,所以DTS 通过在PG 源库创建DDL Trigger 的方式,来捕获到DDL 的原始数据,再进行同步。


基于logical slot 的方式有一定的限制,比如说logical slot 它不能够支持以历史的点位进行数据拉取,logical slot 必须针对无PK/UK 的表,设置成IDENTITY FULL,所以现在DTS 也在公关physical slot 的增量获取技术,目前已经进入到了测试阶段,在不久的将来能够在DTS 的云上上线。

DTS 及其在PG 数据库生态中的应用

(七)DTS 解决长链路高RT问题

在异地多活的场景里面,要跨Region 同步数据,必然涉及到高RT 的问题。DTS 解决问题的基本思想就是近DB 的部署原则。以增量的数据同步举例,在增量里面,我们的DTS reader 就是拉取增量数据的模块和源库部署在同一个Region。


DTS Writer 就是增量数据写入模块和目标库部署在同一个Region,通过这两个模块近DB 的部署,最高限度的提升数据的拉取和数据的写入效率。


DTS Reader 是要把拉取到的数据投递到DTS Store,而DTS Writer 是要从DTS Store 里面拉取到同步数据,在这一块DTS Store 可以部署在源库的位置,也可以部署在目标库的位置,甚至可以部署在中间位置。


DTS Reader 到DTS Store,DTS Store 到DTS writer,数据传输通道是DTS自身经过高度优化的,通过高度优化过的一个数据通道,我们就解决了高RT的问题。


在这里面的优化点有两个:

第一个,尽可能的减少数据传输量,在这块我们主要是通过压缩,以及源端的数据重删达到的目标。

第二个,因为长链路高RT 长,但是数据的Throughpu 是大的,所以我们就尽可能的使用数据推,而不采用Ping-pong 的方式,以尽可能的降低长链路RT的问题。

DTS 及其在PG 数据库生态中的应用



(八)DTS 提供ETL 能力

DTS 的数据传输通道里面,它实现了几个ETL 能力:


第一个就是表级别的,它可以针对库、表、列进行映射。比如,下图例子里面,它是把原库的Schema 为a.religion_source 这个Relation 映射成b.relation_target 这样的一个表。在原表的c1、c2 列,通过DTS 可以映射成d1、d2 列。


第二个维度就是数据维度。举个例子里数据清洗,比如说它对c2 列null 字段的清洗成good。同时DTS 的ETL 能力也提供了数据的多表合并。


DTS 及其在PG 数据库生态中的应用


三、DTS PostgreSQL 经典案例

在了解了DTS 基于PG 的基础能力,所做的数据通道链路之后,基于这个数据通道能够解决哪些问题。


(一)不停机上云

所谓的不停机上云就是源库的业务不需要停,可以很平滑的将源库迁移到云上。我们有一个比较成功的大型应用案例:


东南亚的一个电商,我们支撑了电商大概8tb 的核心数据,然后涉及到11000 个Oracle 数据库对象的不停机上云。在这个方案里面,我们提供的是双向同步的方案。


先来说正向的方式,首先DTS 会做结构迁移1 动作,在这个阶段主要是负责把源库的库、表、列信息以及PK、UK 信息,迁移到目标库去。在这个阶段完成之后会进行全量数据的迁移,也就是存量数据的迁移。


在存量数据完成之后,我们会进行结构迁移2 阶段的这样的一个迁移动作。在这个阶段主要是去迁移原库的索引。


为什么结构迁移要分成两个阶段呢?


因为经过大量的测试,发现在数据完成之后,创建索引的效率要远高于数据完成之前创建索引,所以把结构迁移拆成了两个阶段。在数据完成之后,我们使用结构牵引二的方式创建索引。


在结构迁移整个的完成之后,我们就开始进行增量迁移。


增量迁移的起始位置是从全量迁移之前开始的,这样能够保证整个数据一致性,当增量迁移整个的数据追平之后(就是增量迁移的内容没有延迟),初步的认为这条链路达到了可以切流的阶段。为了进一步的验证数据质量,可以做全链校验的动作。


全链校验是通过拉取源库的数据,以及拉取目标库的数据进行全字段的比较,验证两边的数据是一致的。


当全链校验没有问题之后,就可以进行切流的动作。在切流之前还会搭建出来一条反向链路,所谓的反向链路,我们去启动增量数据的拉取服务,去拉取目标库增量数据,增加增量数据的写入模块,将增量拉取的数据写入到源库。


通过反向链路的建立,如果在切流的过程之中,目标库出现了一些不适配,或者业务有一些不适配,我们可以快速的回切到源库。


这个不停机上云的方案,我们在云上已经服务了很多这样的用户了,有着非常广泛的应用。


DTS 及其在PG 数据库生态中的应用


(二)异地灾备

来看下图中的例子,这个案例是基于DTS 数据传入通道来实现的异地灾备。


用户两边都是自建库,但是这两边自建库它是在两地的,通过DTS 云上的高效数据传输能力,帮助用户搭建了一条从杭州到青岛的灾备链路。


DTS 及其在PG 数据库生态中的应用

(三)数据集成

下图是一个数据集成的例子,DTS 将TP 数据库,把这样的数据拉取到DTS 的DTSStore 里面,通过DTS 的同步链路,可以写到如ADB、Kafka 这样的AP 的数据分析平台里面去。


也支持用户使用数据订阅的客户端,拉取增量的数据出来,按照用户自己的场景进行灵活的应用。

DTS 及其在PG 数据库生态中的应用

(四)大数据下游

下图是大数据下游的案例, 首先用户是在云下,通过专线上云,通过数据同步链路用户在云上的Ecs 上也有一个数据库,通过专线完成线下到云上的一个同步。


在上云之后,用户在异地又建立了一个灾备库,通过DTS 完成在灾备库同步。对于灾备库自身,用户为了尽可能的发挥灾备库的价值,又在灾备库上建立了一条订阅的链路。通过DTS 把数据同步到Kafka,然后再由用户大数据的下游,从Kafka 托举数据进行消费。同时在云上的库,用户通过DTS 建立了一条反向链路,通过这条反向链路,再回流到用户云下的数据库里面去。

DTS 及其在PG 数据库生态中的应用

四、总结

总结来说,我们以数据传输通道的视角,讲述了PG 在数据传输通道上的能力,以及以DTS 为例,说明了基于这些能力,如何构建一条数据传输通道,并且例举了数据上云、数据灾备、数据订阅等经典的使用案例,希望对大家有所帮助。

上一篇:PostgreSQL技术进阶必备《PostgreSQL实战教程》独家下载


下一篇:开放下载 |《深入MySQL实战》快速理解MySQL核心技术