阿里巴巴瓴羊基于 Flink 实时计算的优化和实践

摘要:本⽂整理⾃阿里云智能集团技术专家王柳焮⽼师在 Flink Forward Asia 2023 中平台建设专场的分享。内容主要为以下四部分:

  1. 阿里巴巴瓴羊基于 Flink 实时计算的平台演进
  2. Flink 能力优化与建设
  3. 基于 Flink 的最佳实践
  4. 未来规划

1. 阿里巴巴瓴羊基于 Flink 实时计算的平台演进

1.1 关于瓴羊

首先简单介绍一下瓴羊,瓴羊是阿里云智能集团的重要业务,致力于将阿里巴巴沉淀十余年的数字化服务经验,系统化、产品化地全面对外输出给千行百业。2012 年开始,阿里提出数据建设方法论,内部形成多款标杆数据产品,如生意参谋、双十一数据大屏等。2015 年阿里巴巴启动中台战略,进行全域数据建设,强化数据能力在业务端的价值显现,让数字化能力广泛服务于各个业务。2018 年应商家日益增长的需求,阿里巴巴集团中台能力进行对外输出,推出 Dataphin、QuickBI 等产品,帮助企业通过数字化技术驱动创新和增长,在生态内外产生显著价值。2021 年瓴羊成立,成为阿里巴巴动物园中的一员。

1.2 Dataphin平台实时业务规模

而在这个过程中打造出的智能数据建设与治理 Dataphin,不仅在阿里巴巴集团内部支撑着各 BU 实时业务,同时也在云上服务于各行业推进企业数字化进程。

在集团内部,平台约承载有 15000+ 的实时计算任务,1000+ 的流批一体任务,覆盖约 50+ 个BU。

在云上,我们的客户分布在电商、金融、交通、零售、制造等行业,应用场景覆盖诸如实时 ETL、实时大屏、实时集成等等实时主要场景,以及像金融行业的特征计算以及风控场景等等。

1.3 Dataphin平台实时架构大图

下面介绍一下 Dataphin 平台的实时架构大图。

首先部署形态上平台支持多云输出,提供公共云、私有云、专有云、混合云等多种输出形态。在多云架构之上,支持提供基于不同调度系统的作业运行能力。在这之上,平台提供不同 Flink 发行版的多版本多集群管理能力,以及多数据源、计算源的元数据管理体系。

最上层,主要由四大模块组成。

● 数据集成,提供无代码化全增量一体的数据集成能力。

● 研发中心,提供编译优化、任务模版、流批一体等能力的实时研发体验。

● 运维中心,提供任务全方位的监控告警体系,保障线上任务的稳定性。

● 资产,支持表、字段、函数、血缘等数据资产的盘点与展示、标准定义与管理、质量评估及保障、分类分级与脱敏等等能力

如上支撑着整个 Dataphin 平台实时计算的全流程研发体系。

2. Flink 能力优化与建设

接下来将围绕采、管、建、用四个方面,分别讲解我们在 CDC 能力提升、元数据管理、流批一体建设、运维体系上分享平台对 Flink 能力的优化与建设。

2.1 CDC 能力提升

在实时数据采集上,平台除了支持社区 Flink CDC 的能力外,还提供了自研的 Flink Connector 去满足更多样化的数据同步场景需求。左右两列是平台目前已经支持的提供增强 CDC 去做数据同步的部分输入输出,更多的数据源还在陆续增加中,我们目前做到的四个能力分别是:

● 一个是自动化感知来源库/表变更的能力,

● 支持配置多样的规则满足不同数据源的入湖入仓场景,

● 同时平台提供无代码化的整库实时同步能力,

● 最后凭借元数据管理体系支撑这种丰富多样的源端到源端的数据流通能力。

平台期望基于 Flink 引擎的能力,提供集实时性、全增量一体、无代码化、自动化为一体的实时数据同步能力。

搭载自研 Connector 的 Flink 采集任务目前支持自动感知8种数据库的源端操作,以及提供 3 种不同的处理策略。

● 对于8种数据库的源端操作,因为对行数据的增删改操作进行感知是 CDC 最基本的能力,这里就不赘述了。除此之外,平台还支持对数据表中列的一系列操作,比如列的新增、列的删除、列的重命名、列类型变更,特别的像表的重命名、清空表等等操作,都会进行自动采集发往下游进行处理。当然更多的场景用户期望的是整库同步,平台对数据库中表的删除或者表的新增,都会形成记录进行采集。

● 对于3种不同的处理策略,同时平台支持用户针对每一种源端操作的记录,去配置不同的消息处理策略。除了正常处理外,还可以配置忽略或者出错策略。比如数据开发人员对业务库一些新增的列不需要关注的可以选择不去处理,但对于已采集的列如果被删除或者更改的下游不可接受的情况,可以终止任务采集并发出告警。

同时针对 CDC 数据发往下游入湖入仓的场景,提供一些可配的规则。

● 支持用户选择目标库已有表,或者自动建表。

● 可以配置自动建表的转换规则,比如支持基于来源表名字加上前后缀来创建目标表名。

● 平台提供表级别映射状态监控,用户可以实时查看整库同步中每一张来源表到目标表的映射状态。

● 最后支持自动在目标表添加变更记录的描述字段,包括变更发生时间、操作类型等,以便业务识别使用,同时用户还可以自行添加全局字段满足特有场景的需求。

刚才介绍的所有的 CDC 能力的提升,平台侧都可通过配置化的任务形式,无需代码,只需配置一个任务即可实现整库的数据实时同步。

如上图所示,用户可以按数据源的类型选择同步任务的输入数据源和输出数据源。在同步规则配置里,支持选用整库、圈选表、排除表的方式从来源库选择需要同步的表,平台能实时感应所配置的数据源中的元数据信息,并进行展示,用户直接勾选对应的数据表即可。

如上图1是刚才说的配置表的映射规则,在列表中会显示当前的映射状态。比如规则选择已有表时,检测到目标库并没有对应的表名时,映射状态就会显示告警,在同步任务运行前就能发现问题。

如上图2是配置源端操作处理策略的配置页面,针对每一种源端操作都可分别配置对应的 DDL 处理策略。其中行数据的增删改默认是正常处理的。

2.2元数据管理

在Flink支持这样丰富的多源数据同步的场景下,面对多达几十上百种不同的数据源,用户又该如何有效应对呢?如何有效的管理不同的数据源连接信息,做好元数据管理就显得尤为重要了。

用户常常会遇到如下情况:

● 相同的源例如 Mysql,既能 CDC 又能 JDBC,如何有效识别并按需引用?

● 相同的 DDL 在不同的作业间如何复用?

● 相同的 DDL 在不同的作业使用场景能否进行差异化配置?

● 离线物理表是否可以直接引用,并与实时 DDL 进行联动从而接入全域血缘?

于是我们基于这些问题,构建了一套元数据的管理体系,分别是 Source 层、Meta 层和 Job 层。

● Source 层按照物理数据源进行管理,与引擎解耦,管理基础的连接信息,这样离线和实时可以复用。如果有些数据源能当做计算引擎还能将其配置为离线计算源,比如 Hive、Hologres、Adbpg 等等。源的 Catalog 也会在这一层进行管理。

● 再上一层构造 Meta 层,这一层会基于数据源的构建,可以是按照 Connector 类型区分的 Flink DDL,我们将称为实时元表;也可以是来自数据源或者计算源中的物理表。还可以是基于实时元表或物理表构建的流批一体映射镜像表或者是批查询的混合元表,每一种表都各有用处后续会介绍。而在这一层的表,平台会自动将其翻译映射为对应的 Flink DDL 供上层的作业层引用。

● 最上一层的 Job 层,可以重复引用 Meta 层的表,这样除了避免反复构建 DDL,也可以使得 DDL 的变更可以被所有引用的作业所感知。同时平台提供自研的 SET 词法,可以针对不同的任务差异化配置 DDL,比如不同作业 Kafka 的 Consumer Group 是不一样的,可以通过 SET 词法单独配置。当然作业里也可以直接写原生 Flink DDL 语句,支持混合使用,灵活性很高。

下面我们来看看具体的案例

目前平台支持接入数十种数据源,其中大约 10 种数据源的 Catalog 接入到 Flink,这里以 Paimon 为例,作业通过引用对应的数据源编码ID,平台可帮用户翻译生成 Create Catalog 语句,用户可直接在作业中访问 Catalog 中的物理表。

如果要基于数据源创建实时元表,对于可以识别到表结构的数据源比如 MaxCompute,平台会自动帮助用户填充对应的字段快速创建元表。如果用户想在流批一体场景下创建逻辑镜像表,可根据字段进行智能映射,提高效率。这里以 kafka 为例,作业通过引用对应的实时元表名称,平台可帮用户翻译生成对应的 DD L语句,提交到引擎侧运行。

在作业层,还是以 Kafka 为例,对于同一个 Kafka 的 Topic,不同作业可分别 Set 不同的消费组 ID 来针对同一个实时元表进行复用,或者可以根据业务场景 Set 诸如计算列、Watermark 等参数,这些作业的差异化配置,平台都会翻译在最终提交到引擎的可执行 SQL 上,如右图所示,以此来做作业粒度的差异化配置。

用户还可直接在平台上写原生 DDL,平台也支持反向地将 DDL 创建映射为 Meta 层的实时元表。基于元表的体系还有个比较大的好处,就是数据安全,源和表可以针对敏感信息比如密码进行加密,避免 Flink DDL 在 SQL 作业层的明文展示,保护我们的数据安全。

综上这套就是目前平台正在使用的元数据管理体系。

2.3 流批一体建设

当我们把数据也采集上来,元数据也管理起来,就可以基于 Flink 进行业务上的任务开发了。那接下来就流批一体这个典型场景,分享一下在元数据管理体系之上,平台在流批一体建设中所做的一些工作。

首先,Flink 作为一个流批一体引擎,其流任务和批任务还是需要区分模式分别运行。这就造成了开发人员在 Flink 流和批任务两个作业间来回切换,开发体验割裂,容易出现变更遗漏,也就导致数据一致性和质量难以保障。从存储层面来看,流批存储系统隔离,提供的数据服务不一致,维护成本高。

针对这种现状,平台的解法是提供流批一体化的开发模式。首先面向开发人员,只需维护一套代码,由平台根据流批不同的模式翻译为对应的流批可执行 SQL。在这个场景下,对一个流批任务进行变更,流或者批的计算口径会同时变更,解决业务口径不对齐带来的数据质量风险。同时在存储层面提供面向流批一体逻辑镜像表,无论底层是统一存储或流批不同存储,面向 SQL 侧看来是一张表,这张表可对外提供一致的数据服务。

下面我们来看看流批一体的具体的案例

最上层是在上一节提到的元数据体系。通过配置流批一体混合源镜像表关联流和批对应元数据表(上节说过,元表可以是 DDL 或者 Catalog 中的物理表),随后我们可以在一个 Flink SQL 作业中对其进行引用,如下图所示,从这张混合源表中读取数据,根据条件进行聚合计算,写入下游表。

平台侧提供两种模式的开关:

● 纯实时模式,平台根据用户配置的实时参数帮助用户翻译为对应的实时可执行 SQL。

● 纯离线模式,用户可以在当前作业配置离线的调度配置。

当同时打开这两种模式时,用户就开启了“实时+离线”的流批一体模式。用户可以用 Set 语句操作混合源表,如红框所示,实现对流批不同的 Flink DDL 的 With 参数进行差异化配置。也可以设置宏变量对不同的 Where 条件进行分别配置。

依靠平台自研的引擎编译模块:

● 对实时模式对应的 Kafka 表,设置相应的消费组,配置相应的数据偏移过滤条件,最终可生成的如左图所示的可执行 SQL。可以看到 DDL 的 With 参数新增了消费组的配置项,过滤条件被替换为实时的过滤条件。

● 对离线模式对应的 MaxCompute 表,同样能设置离线特有的分区,配置相应的过滤条件,最终可生成如右图所示的可执行 SQL。可以看到 DDL 的 With 参数新增了分区的配置项,过滤条件被替换为离线的过滤条件。

值得注意的是,时间参数 stat_date、bizdate 可以在任务的启动或者调度时按需配置。综上所述就是基于平台解决方案实现的一个 Flink 流批一体场景。

2.4运维体系

在刚才的三个小节,已经介绍过实时计算的采集、管理、建设,接下来最后的关键环节就是如何去运维好 Flink 任务。整个运维体系是支持多调度模式、多集群环境、多引擎版本的。

● 首先是基于 Hadoop 的 Yarn 体系,平台目前支持诸如 CDH、CDP、EMR、TDH 等等的 Hadoop 发行版,用户只要有个 Hadoop 集群,无需自行安装任何的 Flink 组件,由平台提供对应的 Client 帮助用户与集群对接,并帮助管理引擎的多版本。

● 其次还支持像阿里云实时计算 Flink、华为云 MRS Flink 的全托管方式对接。

● 另外平台也支持 K8s 的任务调度模式,Dataphin 部署的时候可内置 K8s 集群,用户可在无 Hadoop 集群情况下享受实时计算的能力。如果用户期望对接自己的 K8s 集群,平台后续也将支持。

基于这种多调度模式、多集群环境、多引擎版本的体系,用户可按需选择适合其的 Flink 运行方式。

好了,当 Flink 任务真正运行起来之后,需要通过全面的监控告警体系来保证业务的正常。对于运行中的 Flink 任务,平台通过 Metric Reporter 和 Log Agent 分别采集任务的运行指标和日志,指标会落到时间序列数据库中进行持久化存储,日志会落到文件存储中,平台提供统一的监控看板供用户查看。

同时用户也可基于相应的指标配置对应的告警策略,告警中心匹配告警事件后通过通知服务多渠道发送消息。用户收到告警,可在监控看板上定位问题,形成闭环。

在这个体系中

● 平台为用户提供丰富的监控指标监控,包括:Checkpoint、Io、Watermark、CPU、Memory、Jvm 等等。

● 提供灵活的报警策略,包括值班表、自定义消息渠道,用户不想被频繁打扰时还支持配置发送次数和报警抑制逻辑。

● 对于异常日志,平台支持 Warn 级别以上日志的定位,方便用户直接定位到具体某个 Jobmanager 或 Taskmanager 上,减少用户的检索时间。

● 指标支持当前到数天内均可查询,一是方便用户进行查看复盘,另一方面后续平台也会尝试去基于历史数据提供 AIops 的能力帮助用户提前感知任务异常。

● 对于 Flink batch 的任务,平台提供整个离线调度链路的基线产出监控,链路上可能包括其他 Hive、MaxCompute、Hologres 等离线任务,破线同样会发出告警。

如上展示的是监控看板中对监控视图和日志采集的一些产品截图。

以上就是 Dataphin 平台对 Flink 在采、管、建、用四个方面所做的一些工作,通过这四方面对 Flink 的能力优化以及平台化的功能建设,让用户在实时计算全生命周期的研发效能和体验有较大提升。

3. 基于 Flink 的最佳实践

接下来介绍平台基于 Flink 的最佳实践。

这里挑选两个比较典型的场景来给大家做一个分享,一个是特征计算,另一个是湖仓一体。

3.1 特征计算

首先是特征计算,一开始我们做特征计算,使用的明细数据方案,那时候数据量还没那么大,明细数据直接通过 Flink 实时地写入到 Hbase 表中,离线数据从 ODPS 回流 Hbase 中,用于做特征的冷启动加速。

在这个架构中,计算任务只负责导入明细数据到视图中,聚合逻辑全部放在服务任务中。

这个架构的痛点也很明显,当处理热点数据的特征加工时,明细数据在服务端的计算压力很大,上述的方案是无法满足大数据量的计算要求的。

所以后面我们考虑利用分账的思想,将计算任务分别生成日维度、时维度、分维度的固窗数据,提前计算并保存在数据库中,查询任务利用窗口合并的方式,提供滑窗的在线计算能力。

如上图所示,因为提前计算好不同维度的窗口,用户配置任意的窗口,都可以根据天、时、分钟的固窗数据进行增量聚合来计算出最终结果。在这里我们自己对 Flink 的 State 进行管理,并新增了很多自研算子,像偏度、峰度等等。通过自研的算子 Operator 去适配这种自定义的窗口触发方式。

基于刚才的解法,我们称其为预计算方案。Flink 依赖算子拆分日、时、分维度计算并存储于 Hbase,配置不同滑窗的特征批量生成,基于 Hbase 查询及内存计算,这样在服务端聚合计算的是已经预聚合后的数据,计算压力大大减少。

再后来,我们想能不能直接利用 Flink 算出结果,并利用其批处理的能力做到流批一体的冷启动,所以后来我们提出了全计算方案。直接服务端进行划窗聚合,写入 Hbase。这样集群的存储数据量相比于预计算会更低,服务端的聚合查询性能由于是点查会更好,但同样的 Flink 任务计算端消耗的资源会相应增加。

如上图下半部分是预计算方案和全计算方案的一个对比表格,可以看出有几个区别:

一个是特征开发的灵活性,对预计算方案来说,依赖自研的状态算子按照天、时、分来做预计算,开发成本高。对于全计算方案来说,除了可依赖自研算子外还能用官方的滑动或固定窗口进行计算,灵活性更高一点。

另一点特征快上的能力来说,对于特征口径的更改,预计算方案对任意窗口的计算放在服务端,可以及时生效。而全计算方案在Flink端,需要操作任务来让配置生效,快上的能力预计算更优。

最后从计算成本来说,全计算方案相对于预计算方案,Flink 新增资源消耗会比 Hbase 的存储减少来说,成本更高。

下面分析一下基于自研状态算子和官方滑动窗口的性能对比。场景是以交易TT流为输入,Hbase 为存储输出,每5s触发过去24小时的滑动窗口计算任一卖家累计销售金额。方式是通过回拉 3 天点位进行性能压测。从端到端延迟、任务 RPS、资源消耗、下游 Hbase 写入IO、State、Checkpoint 等多个维度进行对比,这里只展示其中差异比较典型的三个,分别是回追过程中的数据时延、任务 Failover、Checkpoint 大小以及成功率。从监控图表对比可以看出,自定义状态算子相较于官方窗口,有更快的数据回追能力,因为官方窗口的滑动窗口是不共享状态的,而全计算方案针对同一份天、时、分计算状态进行聚合。导致官方窗口在任务中有过多窗口处于计算中,Checkpoint 一直无法成功打上,导致作业频繁的 Failover,无法有效输出最终的计算结果,作业延迟无法追上。也就使得自研状态算子,有更优的作业稳定性,更好的 Checkpoint 成功率。

以上就是我们在特征计算中的一些实践。

3.2 湖仓一体

另一个实践场景是基于 Flink 的湖仓一体,首先我们来看一下整体的架构体系。

结构化、半结构化、非结构化的数据一方面可通过基于 Flink 或者 DataX 的集成方案写入数据湖中,另一方面也提供 OpenAPI 的方式将数据写入湖的底层存储中。

我们以 Paimon 数据湖为例,底层的存储根据云环境和部署形态,可选择基于 HDFS 或者 OSS。按业务可分为 ods、dwd、dws 层,分层之间可通过 Flink 进行数据处理,也可通过其他离线引擎分开处理。

● 湖的上层提供诸如 Hive、Spark、Presto 等等的 Adhoc 即席查询能力,提供不同场景的取数需求。

● 湖中的表、字段、血缘经过解析都会进入到资产中,进行统一的元数据管理。

● 湖中的数据可通过数据服务对外提供服务,提供给诸如实时报表、实时监控、算法服务等等应用中。

目前的湖仓架构,基于 OSS 或 HDFS 的文件系统,有三种数据湖 Paimon、Iceberg、Hudi 的可供选择。

外层对接的数据源按类型分为6大类存储,分别是:

● 文件系统,如 Oss、Hdfs 等

● 消息队列,如 Kafka、Mq 等

● 大数据存储,如 Hive、Maxcompute、Impala 等

● 关系型数据库,如 Mysql、Pgsql、Oracle 等

● NoSQL,如 Hbase 等

● 用户还可以自定义数据源,提供自定义 Flink Connector 接入平台

对数据来说,不流通无价值,无价值不流通。在整个湖仓一体体系中,基于 Flink 与 DataX 架构的数据同步能力,我们让数据很方便地流通起来,使得数据入湖、数据出湖、湖与湖之间、仓与仓之间的数据流转变得简单高效。如此跨源支持数据流通,就可以轻松汇集和保存海量业务数据。

但再方便的入湖工具,历史数据都有迁移成本,无论是人力成本、数据验证成本等等。所以当用户觉得历史数据太重,不想完整地把仓的数据迁移到湖中的时候,我们的解法是增量数据将其入湖,历史数据不迁移,平台侧提供统一引擎服务智能识别分区。

如上图左半部分所示,将数仓里的 Hive 表和湖的 Paimon 表形成一张混合源的表,One Service 查询引擎对这张混合源的表进行查询。

如右图所示,假设这张混合源的表我们将其命名为 ods_orders,用户可以写如下语句,select * from ods_orders where dt = 某个分区,One Service 查询引擎判断混合源表的分区界限,当 dt 小于迁移时间时,继续查询原 Hive 表返回结果。当dt大于迁移时间时,查询新 Paimon 表。这样对用户的体验来说,他面向的还是一张表,用数取数的体感和一张表还是比较一致的。但是对于跨分区区间查询的场景,还是更推荐用户通过上一节讲的平台同步工具将数据完整入湖。

以上就是我们在湖仓一体中的一些实践,湖仓一体场景也是我们一直在实践优化的一块,后续也会持续在这个场景上发力探索,敬请期待。

4. 未来规划

接下来分享一下平台的未来规划。

一个是行业解决方案方面,我们会优先将湖仓一体的场景深化落地,在产品上做成完整的解决方案面向客户,让客户能在 Dataphin 平台上便捷高效地构建起自己的湖仓架构。

第二个是平台功能完备,首先会继续丰富 Flink CDC 支持的数据源,覆盖更多客户的场景。第二个是建设全域元数据中心,在前面有介绍过我们的元数据体系,后面我们会将 Dataphin 平台内,即域内;平台外的元数据体系,指域外。将域外和域内进行打通,形成全域元数据中心,以形成更全面的表、字段的血缘体系。

第三个是持续进行体验优化,首先我们期望的是对于一条实时数据从上游往下游发送的过程中,平台能帮助用户定位到其具体的流向位置,方便纠错排查。其次,在运维体系上,期望能基于一些典型的异常场景,通过智能诊断,做到预警,在可能的异常要发生前就能将告警发出通知到用户。

上一篇:K8s: 控制器之ReplicaSet对象


下一篇:【机器学习】机器学习学习笔记 - 监督学习 - 逻辑回归分类朴素贝叶斯分类支持向量机 SVM (可分类、可回归) - 04