实时数仓入门训练营:Hologres 数据导入/导出实践

本文整理自直播《Hologres 数据导入/导出实践-王华峰(继儒)》
视频链接:https://developer.aliyun.com/learning/course/807/detail/13891

内容简要:
一、Hologres生态介绍
二、Hologres实时读写接口介绍
三、Hologres实时读写场景介绍
四、Demo演示
五、常见问题及未来展望

Hologres生态介绍

(一)Hologres生态

实时数仓入门训练营:Hologres 数据导入/导出实践

Hologres是一款兼容PostgreSQL协议的实时交互式分析产品,也已经打通了大数据生态。以最常见的几个开源组件来说,如Apache Flink、Spark、Hive、Kafka等,Hologres都已经有了相关的Connector实现并进行了开源。

对于实时链路,用户依托Flink或者Spark,就可以将上游的比如埋点或业务数据等,以非常高的性能以及毫秒级的延迟导入Hologres。对于离线链路,Hologres也支持把外部系统的数据以非常简便的操作导入,反过来也支持再将数据备份回外部系统,比如阿里云的MaxComputer、OSS等。

当数据导入Hologres之后,因为Hologres本身兼容PostgreSQL协议,所以能使用各种现成的查询工具,无缝连接Hologres进行数据的展示、查询等。

(二)Dataworks数据集成支持输入

除了刚才提到的大数据场景之外,使用阿里云的Dataworks数据集成功能,我们还能将用户存储在传统数据库中的数据导入Hologres,实现方便高效的数据库整库实时镜像。

实时数仓入门训练营:Hologres 数据导入/导出实践

如上图所示,当下Dataworks数据集成支持将MySQL的Binlog,SQLServer的CDC,Oracle的CDC实时镜像同步至Hologres。此外,Dataworks也支持将Kafka,还有阿里云Datahub的数据同步至Hologres。

实时数仓入门训练营:Hologres 数据导入/导出实践

另外值得一提的是,Datahub这个产品自身也提供了直接将数据实时同步到Hologres的功能,这个功能叫Datahub Connector。使用这个功能用户就无需经过Flink或者其他组件,可以直接将数据导入到Hologres,对于无需ETL的数据同步是一个比较快捷的方式。

Hologres实时读写接口介绍

实时数仓入门训练营:Hologres 数据导入/导出实践

Hologres实时读写实现原理

上图为整个Hologres实时读写实现原理架构图。

从上往下看,最上游是应用端,也就是会读写Hologres的各种客户端,比如说数据集成,Apache Flink、Spark等等。这些客户端通常会使用SQL接口,将读写数据的请求发送给Hologres,这些请求会经过一个负载均衡服务器,然后这些请求就会路由分发到一个叫做Frontend的节点。一个Hologres实例通常有多个Frontend节点,这样就可以支持非常高的QPS请求。Frontend节点主要是负责SQL的Parse、优化等功能。

经过一系列的处理之后,Frontend就会将用户的SQL请求转换成一个物理执行计划,然后这些物理执行计划就会被分发到后端的一个执行节点,执行真正的物理读写请求,最终写入的数据会持久化至分布式文件系统,比如阿里的Pangu系统或者开源的HDFS。

这里要特别强调的是,正常的SQL解析,然后通过Query的优化器优化生成最优执行计划,通常这部分的链路开销是比较大的,对于高QPS的读写场景,这往往会成为一个性能的瓶颈。

所以对于一些常见的SQL场景,这里我们列了几个 SQL,如下所示。

实时数仓入门训练营:Hologres 数据导入/导出实践

Fixed Plan

比如Insert into table values (),就是简单地插入一行或者几行。还有Insert into table values () on conflict do update,就是对数据进行几行的更新。Select * from table where pk = xxx和Delete from table where pk = xxx是根据主键去进行数据的查找或者删除。

对于这些常见的SQL,Hologres的Frontend做了一定的短路优化,略去了很多不必要的优化逻辑,直接生成最优的一个执行计划,并发送给后端的执行节点,这样就能提升整体的请求吞吐。

下面我们看一下,当物理执行计划发送到后端之后是如何处理的。

实时数仓入门训练营:Hologres 数据导入/导出实践
实时数仓入门训练营:Hologres 数据导入/导出实践

Hologres后端的整体存储引擎是基于Log Structured Merge Tree(LSM)来实现的,这里LSM能够把随机写变成顺序写,大大提升了数据写入的吞吐。

写请求首先会被写到Write Ahead Log,也就是传统的WAL文件中,一旦写入成功了,我们就认为这条记录永久写入成功了。之后,我们会把WAL日志Apply到Mem Table里面,Apply完成后,数据就对外可见了,可以进行查询,这中间的延迟通常在毫秒以内。

当Mem Table写满了之后,我们会有一个异步的线程,将Mem Table刷盘持久化,整体流程是一个比较标准化的LSM实现。

这里有别于其他LSM实现的存储系统,比如HBase,Hologres后端采用了全异步的实现,基于协程省去了操作系统内核线程开销,大大提升了系统CPU的利用率,使得Hologres的读写性能非常优异。

我们再回过头来看一下上面应用端的数据写入接口,现在Flink、Spark和Dataworks读写Hologres其实都使用了一个叫做Holo-Client的SDK。

实时数仓入门训练营:Hologres 数据导入/导出实践

Holo-Client基于Jdbc实现,对读写holo最佳实践的封装,可以减轻数据集成开发工作量。
我们也对一些特定场景的SQL做了一定的优化,例如:

  • 数据写入
    1)攒批,基于jdbc reWriteBatchedInserts的实现原理;
    2)数据合并,相同主键的INSERT/DELETE在一个批次中会合并减少请求量;
    3)自动提交,支持基于批行数、批字节大小和最长提交间隔自动提交。
  • 数据点查
    1)提供异步点查接口;
    2)QPS高时自动转入攒批模式。
  • 数据Copy
    提供并发CopyIn的简易接口。
  • 异常处理
    对holo返回异常归类,正确在holo升级、扩容等场景下重试等待实例恢复。

我们非常推荐用户之后如果有读写holo的场景,就使用Holo-client这个SDK。

Hologres实时读写场景介绍

介绍完Hologres的读写接口的实现原理之后,接下来看一下基于Hologres读写接口能实现的几种常见的读写场景。

(一)实时写入场景

实时数仓入门训练营:Hologres 数据导入/导出实践

实时数仓入门训练营:Hologres 数据导入/导出实践

第一种是最简单的实时写入场景,如上所示。

这里我们使用了一个Blink SQL的实例,其实就是生成一个随机数据源,然后将数据导入至Hologres。对于实时写入的场景,Hologres支持行存和列存这两种格式,还支持根据主键进行去重,这是相较于很多其他 OLAP系统的一个非常大的亮点。

另外,Hologres的实时写入还支持整行数据更新或者数据的局部更新。对于性能而言,Hologres导入即可见,拥有非常低的延迟,通常延迟在毫秒以内。经过我们自己的测试,以TPCH PartSupp表为例,我们后端单Core能达到2万左右的RPS,而且该性能可以随着实例的资源进行线性扩展。

(二)实时宽表Merge场景

然后接下来我们介绍一下实时宽表Merge的场景,这里其实是使用了holo的整行局部更新的功能。

实时数仓入门训练营:Hologres 数据导入/导出实践

以上图为例,比如用户想将多个数据源的数据合并成一张宽表写入至Hologres。我们希望整张表最终有A|B|C|D|E|F六个列,然后有一部分数据,比如说A|B|C|D这四个列是在一个数据源里面,然后A|B|E|F是在另外一个数据源里,我们希望把这两个数据源的数据合并写入至Hologres的一张宽表。

常见的一种实现是我们会使用Flink的Join功能,就是使用两个流同时消费上述数据源,然后在Flink里面进行两个流的Join,进行数据的打宽,最后写入到Hologres里面。

但是这种场景的一个问题是Flink的Join开销通常非常大,因为它需要缓存非常多的状态,这对于整个作业的维护是一个非常大的开销。

下面我们来看一下Hologres是如何解决这个问题的。

实时数仓入门训练营:Hologres 数据导入/导出实践

上文提到Hologres自身支持整合数据的局部更新功能,如上图所示,我们可以直接用两个流来直接写Hologres,而无需进行再做Flink内的Join。一个流比如A|B|C|D可以直接写Hologres,另外一个流A|B|E|F也可以直接写Hologres。因为这两个流的数据有相同的主键,所以当两行数据用相同的主键写入到Hologres的时候,Hologres内部会进行一个Merge,最终达到数据打宽的功能,省去了用户自己去写Flink Join,以及维护这么一个复杂作业的问题。

(三)实时维表Join场景

介绍完实时写入Hologres场景之后,下面我们来看一下实时读的场景。

实时读通常分为两种,第一种就是我们常见的Flink的实时维表Join场景,维表Join就是一个点查的实现。

实时数仓入门训练营:Hologres 数据导入/导出实践

实时数仓入门训练营:Hologres 数据导入/导出实践

这里Hologres的行存表通常可以替换HBase来实现Flink的维表功能,提供非常高的吞吐以及非常低的延迟。

(四)Hologres Binlog场景

实时读的第二种场景是Hologres Binlog场景。Binlog和MySQL的Binlog是一个类似的概念,使用Hologres Binlog我们就能实时消费Hologres单表的Change log,可以对每行数据的更新进行追踪记录。

现在实时计算 Flink 版的Hologres CDC Source,能实现表的实时镜像同步,甚至使用Flink + Hologres,能够实现ODS到DWD表的实时 ETL。

Hologres的Binlog功能默认是不开启的。

实时数仓入门训练营:Hologres 数据导入/导出实践

上图是一个例子,列出了如何使用Hologres的Binlog,这里是一个建表的DDL。

可以看到我们有额外两个表的属性,一个叫做binlog.level,设置为replica,代表这张表会开启Hologres的Binlog功能,’binlog.ttl’就是代表Binlog数据的一个生命周期,下面我们使用Hologres Binlog看看能达到一个什么样的效果。

实时数仓入门训练营:Hologres 数据导入/导出实践

由于Hologres是一个强Schema的数仓,所以我们甚至能够用SQL接口来查询Hologres的Binlog。如上所示,这里我们通过提供几个隐藏列:hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timestamp_us,就能查询到Hologres的Binlog。

这里hg_binlog_lsn就是代表了每条Binlog生成的LSN序列号,然后hg_binlog_event_type是代表了Binlog的消息类型,它是代表Delete还是Insert的,或者是Before Update,或者是说After Update。这里的hg_binlog_timestamp_us代表这条Binlog生成的时间。

有了这几个隐藏列之后,用户就可以非常方便地使用SQL来进行Binlog的查询,进行数据的Debug。

Demo演示

(一)实时计算 Flink 版实时读写Hologres Demo

介绍完Hologres的读写场景之后,我们通过实际操作的Demo来看一下如何使用Flink来实时读写Hologres。

实时数仓入门训练营:Hologres 数据导入/导出实践

如上图所示,首先,我们这里有两张Hologres的表,这两张表都会开启Binlog。我们假设这两者会有实时的写入,然后我们会写另外一个Flink任务去实施消费这两张表的Binlog,进行这两张表Binlog的Merge,甚至进行一定的group by计算操作,最终将这两张表的数据同步写入之后关Hologres的另一张结果表。

接下来进入演示,首先我们看一下Hologres建表的DDL,如下所示。

实时数仓入门训练营:Hologres 数据导入/导出实践


a表的建表DDL

实时数仓入门训练营:Hologres 数据导入/导出实践

b表的建表DDL

这两张表有两个相同的字段,分别叫id和create_time,之后会进行一个数据的聚合。每张表还会有一个不同的值,value_from_a是表a所特有的,value_from_b是表b所特有的。

实时数仓入门训练营:Hologres 数据导入/导出实践

结构表

最后我们会有一张结构表,这张结果表有a和b两张表共有的两个列,分别从a和b两张表得到了另外两个列a和b,我们希望将a和b的数据进行一个实时聚合,写入到Sink表里面。

我们看一下整个Flink的SQL。

实时数仓入门训练营:Hologres 数据导入/导出实践

这里首先是分别声明了两张Hologres的source表,需要实时的消费Hologres两张表的Binlog。

需要注意的是,我们这里需要开启‘binlog’=‘true’这个参数来让Flink进行消费Hologres的Binlog以及开启CDC模式。

实时数仓入门训练营:Hologres 数据导入/导出实践

结果表

然后我们来看一下结果表的声明,如上所示。

在这里需要注意的是,我们需要设置一个‘ignoreDelete’=‘false’,这样防止我们会忽略包括Delete或者beforeUpdate这种类型的数据,导致数据的不一致。

实时数仓入门训练营:Hologres 数据导入/导出实践

我们看一下整个Flink计算逻辑的SQL,如上所示。

这里的逻辑其实比较简单,其实只是将两张表的结果union起来,然后进行一个group by id和create_time进行实时的sum,写入到Hologres的结果表。

这里的作业上线之后,我们可以直接启动运行该作业。

在启动的过程中,我们可以看一下当前Hologres这几张表的状态。

实时数仓入门训练营:Hologres 数据导入/导出实践

可以看到当前Hologres这几张表都是一张空表,我们会对这几张表进行更新,然后看一下数据的同步的过程。

实时数仓入门训练营:Hologres 数据导入/导出实践

首先往a表插入一条数据,可以看到a表的数据已经实时同步到结果表中。
接下来对b表数据进行一个更新。

实时数仓入门训练营:Hologres 数据导入/导出实践

可以看到这两个流的数据已经实时更新到结果表,并进行了准确的数据聚合。

接下来我们再更新a表。

实时数仓入门训练营:Hologres 数据导入/导出实践

可以看到对于源表a的实时更新,已经正确地反映到了结果表当中,Flink非常正确地计算出了两个流的结果。

我们甚至可以看一下这张sink表的binlog数据,因为我们这张结构表也同样开启了binlog的功能,如下所示。

实时数仓入门训练营:Hologres 数据导入/导出实践

可以看到,我们拿到这张表所有的变更记录,和我们预期的效果保持了一致。

以上就是Flink实时读写Hologres的Demo。

(二)Dataworks实时同步Hologres Demo

接下来我们看一下使用Dataworks将PolarDB的数据实时同步到Hologres的Demo演示。

首先我们进入到数据集成,数据同步要进行一个数据源的添加,点击数据源添加。

实时数仓入门训练营:Hologres 数据导入/导出实践

接着新增数据源,选择Hologres,填充完所有的信息之后,我们就可以进行一个数据的添加。

实时数仓入门训练营:Hologres 数据导入/导出实践

新增数据源

接下来进行数据同步的演示。

实时数仓入门训练营:Hologres 数据导入/导出实践

如上所示,首先这里已经有了一个 PolarDB的数据库,以及预先创建好了一张user_details表,可以看到这里已经有三条查询结果记录,之后我们希望把这张表的数据同步到Hologres当中。

然后我们返回到数据集成,点击一键实时同步至Hologres,如下所示。

实时数仓入门训练营:Hologres 数据导入/导出实践

在基本配置中,数据源选择预先创建好的数据源PolarDB,之后选择需要同步的表user_details,然后点击下一步。

实时数仓入门训练营:Hologres 数据导入/导出实践

实时数仓入门训练营:Hologres 数据导入/导出实践

之后,我们会需要选择目标Hologres的数据源,添加后进行刷新,可以刷新出user_details这张表,然后可以配置这张表是否需要自动建表,还是用已经有的表,这里选择我们自动建表,然后点击下一步。

实时数仓入门训练营:Hologres 数据导入/导出实践

在DDL消息处理规则中,我们可以配置各种各样的策略处理,根据需求配置好规则后选择下一步。

实时数仓入门训练营:Hologres 数据导入/导出实践

接下来进行运行资源配置。对Dataworks数据进行实时同步,我们通常需要一个独享资源组,在这里我们已经完成了独享资源组的购买,然后选择各个同步功能所需要的资源组,完成配置并点击立即执行,等待作业的启动。

实时数仓入门训练营:Hologres 数据导入/导出实践

可以看到PolarDB的数据已经实时同步到Hologres这张结构表当中。

实时数仓入门训练营:Hologres 数据导入/导出实践

接下来可以对这张表再进行一定的更新,我们往这张user_details表里面重新插入一条1004数据,数据插入成功后可以看一下Hologres结构表。

实时数仓入门训练营:Hologres 数据导入/导出实践

从后台可以看到,1004这条数据已经实时同步至Hologres,如下所示。

实时数仓入门训练营:Hologres 数据导入/导出实践

通过上方的演示可以看到,使用Dataworks实时同步Hologres功能,我们可以非常便捷地将数据库中的数据同步到Hologres。

常见问题及未来展望

(一)实时计算 Flink 版 Hologres Connector常见问题

经过上述提到的关于Hologres应用场景以及几个Demo,接下来看一下在使用过程中通常会遇到什么问题。

Q: 作业启动失败,无法连接Hologres。
A: Hologres实例需要与Flink集群在同一Region,且使用VPC Endpoint。

Q: 实时写入结果表数据不符合预期。
A: 通常是由回撤引起,需要正确设置ignoreDelete参数。

Q: 实时写入性能慢。
A: 当前高QPS场景的列存表局部更新开销较大,建议换成整行更新或者行存写入。

Q: 维表查询性能较差,且Hologres实例CPU负载高。
A: 通常是由于使用了列存表作为维表,建议切换至行存表。

Q: 实时消费Binlog报错。
A: 通常是由于表没有开启Binlog导致,需要重建表。

(二)未来展望

接下来看一下整个Hologres在实时读写链路上的一个未来的规划和展望。

- Flink One-To-Many 维表Join

这是一个即将上线的功能,我们会在Flink实现一对多的维表Join功能,就不需要强制使用Hologres表的主键进行维表查询。
但需要注意的是,这种场景下面通常性能不会特别好,因为难以避免的查询会导致整表的扫描,使得延迟比较高,所以还是建议用户尽可能使用一对一的点查场景。

- 基于JDBC实时消费Hologres Binlog

当前Hologres Binlog实现是使用了内置接口,暂时没有对外透出。之后,我们会实现基于JDBC的接口实现让用户实时消费Hologres Binlog。

- Dataworks数据集成实时消费Hologres Binlog

当前数据集成并不支持消费Hologres数据,之后我们会支持使用Dataworks的数据集成,能够实时消费Hologres Binlog,这样就能将Hologres的数据实时镜像地同步到其他的数据库当中。

- 无连接限制的SQL读写

由于PostgreSQL的模型限制,当前Hologres整个实例的连接数有一定的限制,之后我们会提供一个无连接限制的SQL读写的功能。

上文提到Hologres的一些 connector和Holo-Client,都已经开源到Github上面,有需求的用户可以访问下方链接进行使用。

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99元试用实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

实时数仓入门训练营:Hologres 数据导入/导出实践

上一篇:Disruptor介绍与基本使用


下一篇:CarbonData集成 Presto(Trino)(3)- 性能测试篇