作者 | 蔡适择(顺丰大数据平台负责人)
整理 | 赵阳(Flink 社区志愿者)
本文主要介绍顺丰在数据仓库的数据实时化、数据库 CDC、Hudi on Flink 上的实践应用及产品化经验。文章主要分为以下几部分:
● 顺丰业务介绍
● Hudi on Flink
● 产品化支持
● 后续计划
1、顺丰业务
1.1 顺丰大数据的应用
先来看一下顺丰大数据业务的全景图。
大数据平台,中间的基础部分是大数据平台,这块是顺丰结合开源组件自行搭建的。与之相关的是大数据分析与人工智能,顺丰有一个非常强的地面部队,就是线下的快递小哥以及运输车辆,需要使用 AI 以及大数据分析来辅助管理,提升整体效率。
区块链,顺丰对接了很多客户与商家,对于商家来说,首先需要确保快件是可信的能够做货物的交易与交换。这块涉及的基本上都是品牌商家,溯源与存证的业务顺丰也有涉及。
IoT,就像之前提及到的,因为顺丰地面部队较多,相应需要采集的数据也会比较多。我们的部分包裹中是有传感器的,车辆也有相关的传感器,如车辆的摄像头,以及快递小哥的手环(包含地理位置、员工的健康状态,对应做一些关怀的举动)。同时,还有一些工作场景既有叉车,也有分拣设备,这些就需要大数据平台来做一些联动,因此 IoT 的应用相对较多。
智慧供应链和智慧物流,这两块更多的是指如何用大数据的手段辅助业务做一些经营上的决策。比如我们有很多 B 端客户,对于他们来说如何在每个仓库里备货,如何协调以及互相调拨,这部分就由智慧物流来完成。
下面这块就是 IOT 实践中的一部分:
从上面可以看出物流本身的环节是非常多的,下单、小哥收件、分拣、陆运中转等整个过程,红色解释部分是指我们会做的一些 IoT 与大数据结合的应用,这里其实大部分都是基于 Flink 来完成的。
1.2 顺丰大数据技术矩阵
下面这张图是顺丰目前大数据整体的架构概览:
1、数据集成层:最下面为数据集成层,因为顺丰的历史原因,所以包含了很多数据存储引擎,如 Oracle、MySQL、MongoDB 等,并且部分引擎仍会继续支持。右下物联网设备相对较新,主要是进行包含普通文本、网络数据库、图像、音频、视频等的数据采集。
2、数据存储计算:实时这块顺丰目前用的最多的还是 Flink,Storm 没有标示出来,目前我们在做迁移。消息中间件处理目前主要使用 Kafka。然后右边存储结构的种类就相对丰富,因为不同的场景有不同的处理方式,比如数据分析需要性能比较强的 Clickhouse;数仓和离线计算这块还是比较传统,以 Hive 为主结合 Spark,目前我们是结合 Flink 与 Hudi 去实现离线实时化。
3、数据产品,我们倾向的还是首先降门槛,让内部开发与用户更容易上手。内部同学如果要掌握如此多的组件,成本是非常高的,再加上规范化会导致沟通、维护以及运维的高额成本,所以我们一定要去做一些产品化、规范化的事情。
1.3 顺丰科技数据采集组成
上图就是我们大数据整体数据采集的概览,数据采集当前包括微服务的应用,部分数据直发到 Kafka,还有些会落成日志,然后我们自己做了一个日志采集工具,类似于 Flume,更加的轻量化,达到不丢、不重、以及远程的更新、限速。另外我们也会将 Kafka 中的数据通过 Flink 放到 HDFS,以 Hudi 的形式去做。下面会详细介绍。
1.4 顺丰数据应用架构
上图是一个简单的应用架构,刚才所说的大数据平台数据我们会按需推送到 OLAP 分析引擎、数据库,这部分数据推送过去之后,到达数据服务平台。该数据服务平台主要是考虑到用户或研发对接数据库更便捷,以往在使用时,内部用户首先需要了解大数据组件的使用,而现在通过我们的数据服务产品以配置化的方式配置查询条件、聚合条件即可,最终把结果生成一个 restful 接口,业务系统可直接调用。比如研发用户需要做搜索,只需要关注入参、出参,中间的过程不需要了解,这样的话就能够最大化的把技术门槛降下来,使用时也会更高效简便。
中间部分我们是基于 Kong 做的网关,在 Kong 里面可以加很多种通用的能力,包括监控、限流、缓存等都可以在里面完成。
右边的 Graphql,是 Facebook 开源的一个组件。前端用户经常会出现需求的变更,后台接口需要相应地进行调整,这种情况就可以使用 Graphql 来支持。这里其实是有两个东西:apollo、graphql_Java,两条线,apollo 适用于前端的研发用户,用 node_js 来完成控制层的内容;graphql_Java 适用于后端的用户,主要提供一些接口。
2、Hudi on Flink
2.1 Hudi 介绍
接下来我们主要介绍 Hudi on Flink 在顺丰的应用实践。Hudi 的核心优势主要分为两部分:
● 首先,Hudi 提供了一个在 Hadoop 中更新删除的解决方案,所以它的核心在于能够增量更新,同时增量删除。增量更新的好处是国内与国际现在对隐私数据的保护要求比较高,比如在 Hive 中清理删除某一个用户的数据是比较困难的,相当于重新清洗一遍数据。使用 Hudi 可以根据主键快速抓取,并将其删除掉。
● 另外,时间漫游。之前我们有很多应用需要做准实时计算。如果要找出半个小时内的增量到底是什么,变化点在哪,必须要把一天的数据全捞出来,过滤一遍才能找出来。Hudi 提供时间漫游能力,只需要类似 SQL 的语法就能快速地把全部增量捞出来,然后后台应用使用时,就能够直接根据里面的数据做业务的更新,这是 Hudi 时间漫游里最重要的能力。
Hudi 有两种的写的方法:
● copy on write。
◎ copy on write 这种形式更多是在每次写的时候,能够重写历史中关于更新记录所在的文件,把它重写并且把增量部分再重新记录下来,相当于把历史状态也给记录下来。唯一的不足之处在于,写的时候性能会稍微弱,但是读的性能是很强的,和正常使用 Hive 没有什么区别。这个也是 Hudi 本身的优点。实时性略低,这部分取决于写的文件合并的频率。不过批量的话,写也不会影响到多少性能,所以本身也是批量的去写。比如每隔几分钟写一次,这个其实也不会产生很高的性能损耗,这就是 copy on write。
● merge on read
◎ merge on read 就是写的时候实时会把 log 以 append 方式写到 HDFS 中并写成文件,然后在读的时候将已经生成的文本,再加上增量的部分合并,做一个 merge 操作。好处在于查询的时候数据都是实时的,但是由于查询任务确实较多,相当于是说每次查的时候,都要把两部分数据取出来并做一个合并,因此也会造成损耗。
以上是 Hudi 情况的简单介绍。
2.2 Hudi on Flink 组成部分 - 数据库实时化
上图是我们将数据实时化 CDC 的过程。数据库的 CDC,基本上都是只能到库级别、库粒度。前面的 source 支撑肯定也还是库粒度,中间会经过两个过程:
● 一部分是 DML,它会有过滤,当库里面有 100 张表时,很多时候有些表是不需要的,这部分我们会直接过滤掉,过滤就主要是通过产品化来打通它。
● 另一部分是 DDl,能够实时更新 schema。比如库表字段的增加或者变更,再或者可能加了个表或者改了一个表,这部分会在实时程序中打通数据直通车,只要有任何变更,就会生成一个新的版本,然后将元数据信息记录到直通车里,同时也会包装到 binlog kafka sink 里记录,每一行会打上相应的版本号。这样的话就对于后面的使用就能够直接对应该条记录,使用非常方便,不会有出错的情况。
2.3 Hudi on Flink 组成部分 - 数仓实时化
这部分主要分享我们数仓实时化的过程,我们的目标是实现 Kafka 里的数据在当前离线数仓中也能真正用起来,包括很多做准实时计算的用户也能够真正用起来。Hudi on Flink 就是我们尝试的方案。以前 Hudi 这块也做了 Hudi on Spark 方案,是官方推荐使用的方案,其实相当于多维护一个组件,但是我们大方向上还是希望所有实时的东西都能够让 Flink 去完成,另外也希望是 Flink 的应用生态能够做得更加全面,在这部分就真正去把它落地下来,并且在生产中应用起来。
其实整个过程,比如做表数据实时化的时候,它是分为两部份,一部分数据初始化,在启动的时候,会把数据重新做批量的拉取,这个是用 Flink batch 来做的,其实社区本身也有提供这种能力。另外 Hudi 本身也具备把存量的 Hive 表 Hudi 化的能力,这是 Hudi 最新才出来的功能。这部分我们会用 Flink batch 的方式重新抽一遍,当然也有存量,对于存量的一些表,可以直接用存量表来转化,然后用 Flink batch 做初始化。
另外一部分是增量更新,增量更新是指有个 DB connect 对接 Kafka,从 Kafka 的 source 拿到数据库增量 CDC 的 binlog,然后把 binlog 进行加工,同时再利用 Flink 本身的 checkpoint 机制(Flink 本身的 checkpoint 整体频率可以控制)进行 snapshot 的过程。其中所做的内容也我们自己可以控制的,所以采用 checkpoint 的形式可以把 Hudi 所需要做的 upsert 的操作全部在 checkpoint 中更新到线上,最终形成 Hudi 里面的实时数据。
2.4 Hudi 数仓宽表方案
直接将 Kafka 数据扔到 Hudi 里相对容易,真正困难的点在于宽表。对于整个 Hudi 来说,宽表是涉及到很多维表,当很多维表或者事实表更新的时候,会由多个事实表做一个关联。但不是每个事实表都能抓到宽表的真正主键,因此 Hudi 没法做这种更新。所以如何把宽表做数据实时化是一个难题。
上图是顺丰的宽表方案。
● 第一层,对于 ODS,可以直接连接 Kafka,用 Hudi on Flink 的框架就能够完成。
● 第二层,DWD,这里也有两种办法:
一种是用 Flink SQL 先把实时的 Kafka 宽表做完,不过这种办法成本会高一点,相当于再次引入了 Kafka,整个数据链路变长,如果真正需要去用实时宽表可以小部分去推,但如果不存在纯实时数据的需求,就没有必要去做 DWD 的实时 Kafka 宽表。
另外,在没有 DWD 的实时 Kafka 宽表的情况下,如何完成上述离线层的 DWD 实时化?这里有几个步骤,首先创建一个维表的 UDF 做表关联,也是最方便的方式。其次,可以考虑直接用 join 的方式,用两个实时表来做关联,但可能存在关联不到的情况。
当然,做维表关联,就涉及到外键主键的映射。外键主键映射是为了让我们能够在另一个事实表更新时,快速找到主键在哪,即外键主键的映射 。另外主键索引,主键索引其实也是跟外键主键的映射相关。至于外键主键的映射,相当于把它建成一个新的表主键索引获取,这样增量更新 Hudi 跟原来的 ODS 层就基本上一致了,这就是宽表实时加工的过程。下图为运单的宽表举例。
3、产品化支持
上述从技术层面分析了顺丰当下业务架构的相关情况,以下将分享我们在产品化上所做的一些支持工作。
3.1 数据直通车
上图是我们的数据直通车,能够做到让用户自己在产品中操作,不需要写代码即可完成,可以实现低门槛的快速简便的应用。比如配置数据接入仅需 1 分钟左右,整个过程就是在产品上以配置化的手段就能够将数据最终落在数据库,我们的离线表、数仓、做数据分析都能够直接快速的运用起来。
另外,数据接入进来之后,需要有数据管理的能力。上图是数据管理能力测试环境的简单情况,我们需要让用户能够管理相关的数据,首先谁用它了,其次它涉及什么字段,有哪些具体的内容,同时它里面的血缘关系又是怎么样的,这个就是我们数据资产管理所具备的功能。
3.2 实时数据使用
上图是我们 binlog 的 SDK,其实像 binlog 这种 avro 的格式,对用户来说使用有一定门槛。但还是有一些编码的用户,对于这些用户我们提供具体的 SDK,所以在 SDK 里真正使用时都做到简便。左边看起来是 json,实际上是 avro 格式。右边的内容就是在 Java 上的使用情况,这个是在代码层面辅助研发快速应用的工具。
我们在平台上也做一些简化的内容,首先有一部分是关于拖拽的,拖拽是指封装一些组件,用户可以通过拖拽来快速完成其需求。这个产品上线后,很多之前没有任何实时计算的经验,甚至连离线开发的经验也没有的用户都能够做实时的数据开发。
上图为实时指标采集,产品上线之后有很多监控的需求,Flink 本身提供很多 Metric,用户也有很多 Metric,我们希望为用户提供一个高效的解决方案,把 Metric 全部采集出来,让用户能够快速应用。
这里在监控里面也做了几个工作,一个是爬虫方案,实现一个 akka 的客户端,Flink 本身是 akka 的框架,每个 jobmannager 都有 akka 的服务、接口,这样只要实现一个 akka 的客户端,就能够以 akka 的 API 形式获取具体的 Metric 情况。这部分采集完之后发到 Kafka,最终存到 TDengine 再到 Grafana,提供给用户。Grafana 也会整合到我们的实时计算平台产品里面来,在面对存量的情况时,不需要重启用户的任务,就能够直接做数据采集。
但在面对增量情况时,就需要补充一些 Metric,比如 CPU 使用率、内存的使用率等。这部分我们以 Reporter 方案来满足,Reporter 方案也是社区当前主推的方案。Reporte r 方案的原理其实是在 Flink 的 Metrics Reporter 里进行插件开发,然后发到 gateway,这个 gateway 其实就是为了避免 Kafka 客户端过多的问题,所以这里中间做一个网关,后面还是和上面的一致,这个就是 Flink 的任务监控情况。
4、后续计划
上述已经分享了我们在内部已经落地、实际应用的过程,后续我们还会做什么?
4.1 弹性计算
首先,弹性计算。目前像监控任务,用户申请的资源远远超过实际需要使用的资源,会造成严重的资源浪费,内存也一样。处理类似情况时,我们使用了 Flink 延伸的框架 Metrics monitor,结合采集的 Metrics,能够做到当整个使用率过低或过高的时候,及时调整达到资源扩缩容或者并发扩容。
4.2 Flink 替换 Hive 演进
上面提到我们存量是有非常多的 Hive 任务,包括 Spark 任务需要进行替换,但怎么去做呢?
首先我们用 Flink 来替换,由于强制或平台自动推荐都有难度,所以我们做了一些折中方案。比如埋点,当需要把数据写到 Hive 的某个表,它会经过 Hiveserver,SQL 解析之后,此时将表进行替换,执行两个路线:一个是正常的 table 这样执行会写到 Hive 里面去。另外也会埋点把写的表替换成另一个表,然后同时再以 Flink 的形式去执行一遍,不过会产生额外的资源消耗,执行大概生成两个表,需要自动计算两者是否一致。如一致测试稳定后就能以计算框架来去替换它。
大部分任务是兼容的可替换的,但也有小部分不兼容的情况,这部分可以采取人工处理,以尽量实现整个技术上的统一,这部分是后续需要完成的。
4.3 批流一体化
上图是我们做批流一体化的过程,批流一体化在元数据管理与权限管理部分都已经有一些落地。
除此之外我们结合刚刚所说替换的过程,上图就是 SQL 的兼容测试。因为这几者都做完之后,其实批流一体化可以同步去做,相当于同一个接口,加一个参数,即可实现流批处理底层引擎的快速切换,有助于整个数据开发能够保持一致,所以批流一体化也是后面需要尝试的。
上图实际上是我们一体化整个框架的最终形式。首先上面有一层 IDE 能够让所有的用户使用。然后下面各种基础功能支持,包括自动补全的 SQL 语法解析功能的支持,再往下就是一些资源管理、调度管理和知识管理,这些也是为了辅助开发而用的。再下面一层是计算引擎,要把这些计算引擎跟用户做一个大的隔离,让用户不用再关注底层技术的实现和使用,这是我们后面的要持续去做的事情。