作者:郑锴,花名铁杰,阿里巴巴高级技术专家,Apache Hadoop PMC,Apache Kerby 创立者。深耕分布式系统开发和开源大数据多年,先后专注在安全,存储和计算领域。之前在 Intel,目前转战阿里云上,致力于提供更好用更有弹性的 Hadoop/Spark 大数据平台。
Spark AI 北美峰会的第一天,坊间传闻被证实,Databrics(俗称数砖,亦称砖厂)的杀手锏 Delta 产品特性作为 Delta Lake 项目开源!会前,笔者有幸同砖厂的两位大佬李潇和连城做了个线下交流,谈到 Delta 时被告知会有相关重磅在大会上宣布,但却没想到是开源出去。为什么呢?Delta 作为 Databrics Runtime 下一代引擎被高调发布两年不到吧,按说油水滚滚地来,这么快就放出去?不过想来也是,Spark 社区最近两年总体感觉提振乏力,潜在的竞争对手 Flink 则在阿里和一众国内大厂的纷纷加持下风生水起,Spark 可能不得不防。Delta 本质上解决了大数据分析场景中常见的数据更新的问题,从开源大数据这个生态圈子来看,Hive 早已经做掉了 ACID 事务支持;Uber 发起了hudi,Netflix 搞 Iceberg,双双进入 Apache 孵化器,解决类似问题。再等下去显然对 Spark 和社区发展不太有利,感觉数砖不仅对 Spark 把控得好,对商业和开源节奏的把握也很到位。Delta Lake 开源这个事情可谓引爆全场,笔者为了这篇文章多点干货要去相关 session 听一听的,结果面对浩浩荡荡的排队,只好作罢。文章为了蹭热点赶得有点急,细节上多有谬误的话,还望拍砖指正。
商业之类的不好说,我们还是言归正传谈技术。Delta Lake 是什么?简单来说就是支持 delta 的 Data Lake(数据湖),可以说是 Data Lake 2.0 吧。之前的数据湖只支持批量插入,写入失败容易产生脏数据,导致查询分析失败;现在支持事务更新,允许一边读一边更新,甚至多个 writer 同时操作,ACID 的那种,保证一致性地读取和分析查询。最新的操作修改能查询得到吗?在大会第一天的 Expo Theater 上,Renold Xin 回答我的问题时特意强调,只要 commit 掉的 edits,当下的查询就可见。解决了这类问题可不得了,这可以说是近年来开源大数据的一个重要发展,意味着从粗放到精细,从野蛮到文明;在此之前具有类似意义的有 Parquet/ORC 这种列式存储格式的引入,和 Arrow 这种数据集内存表示格式的支持。种种迹象表明,大数据发展到现在必须精耕细作,这也对大数据平台研发的从业者有了更高的要求。Delta 涉及诸多特性包含很多亮点,最核心的是什么呢?我们来尝试探究其背后的技术本质和发展脉络,从 delta 的字面意思来看,其核心关注点在于数据更新操作和由此产生的增量数据写入。不是大数据背景的可能就问了,更新操作有什么大不了,计算机系统不就是干这个事情的吗?更新就更新呗,直接修改掉,怎么出来个增量数据?回答这些问题之前我们先扯远一点。Spark 对全量数据利用 SparkSQL 做批处理已经十分成熟,对实时流数据利用 Spark Structured Streaming 做实时计算本身也没有太大问题,现在都在讲批流一体,Spark 基于 RDD 的 micro-batch 老早就进行了批流统一,在性能延时和吞吐之间取得了很好的平衡,在大多数情况下都是没有问题的。不管是批处理还是流处理,背后的大数据存储考虑的都是一写多读,讲究吞吐量,写主要是追加写,写新的数据分区,或者在往已有的分区里面追加数据,总而言之不支持随机修改。你要想做随机修改的话,那就自己做,比如 HBase,简单来讲就是把修改记录攒起来,攒够了阈值再写出去。因此在实时数仓场景里面,实时处理的时候做表的更新(插入,删除,修改),麻烦可就来了,因为表背后的数据通常按只读的 Parquet/ORC 格式存储,没办法修改。怎么办,更新操作记录到单独的文件里面,记 edit log 吧,Delta 里面叫 transaction log,叫啥反正就是所谓 delta 数据。增量数据 delta 就是这样来的,当然是相对于全量数据或者基线数据也就是 base 数据而言。这里面有些重要问题,一个是这个过程会产生大量的 delta 小文件,需要及时做合并;处理场景可能本身还要求事务支持和多版本,满足回溯或回滚;读的话不能只读原来那些 Parquet/ORC 文件了,还得考虑吸收这些 delta 文件。这种情况下存储用强一致性的 HDFS 还好,用 S3 这种最终一致性的可就惨了,数砖几乎 all in cloud,S3 是重头戏,这也就能解释数砖为什么会在 AI 高涨的时候搞出个 Delta 这么大的动静,搞出一种新的 table format 也就是 Delta Table 来系统性的应对这些挑战,实在有点影响主旋律。Delta 的重要性毋庸置疑,借助于 Delta,在数砖 all in 的存储计算分离情景里面,无论批处理,流处理,还是机器学习深度学习,全都能搞定,而且借助于 DBIO 的缓存优化,data skipping 或者文件索引技术,性能还杠杠滴!
Delta 这类技术总结一下就是,按列式格式写 base 数据加快分析读,增量更新数据 delta 则采取行式写入支持事务和多版本,然后系统通过后台不断地进行 delta 合并。这种方式将存储格式和计算结合起来考虑,一方面支持传统的海量数据分析查询,另一方面也支持事务更新写入,读和写采取不同的优化路径,是一种非常成熟的存储架构模式。这种模式越来越多地出现在 HTAP(Hybrid transaction/analytical processing,OLAP + OLTP)系统里面。最近几年 HTAP 有点热,新出现的系统有点多,感觉大家都往这上面靠,八仙过海各有神通。比如开源大数据领域的 Hive,SparkSQL,原本只是批处理分析的,借助于类似 Delta 这种方案的加持,也就沾上了边。国内这两年比较活跃的 TiDB,微软的 SQL Server,原本是 OLTP 为主,也都想办法配备上了 Spark,支持分析负载。开源大数据领域比较早的算是 Cloudera 的 Kudu + Impala 组合,前些年经常看到他们的演示,一边做些记录更新,一边 Impala 查询马上就能得到最新结果,当时的 Hive 和 Spark 此情此景是不是很惭愧不得而知。更清奇的是消息订阅分发系统,竟也能装扮一新摇身变成 HTAP,像 Kafka 很明显在往这个方向走,Apache Pulsa 也是。前不久有幸跟 StreamNative 的创始人 Sijie 聊到细节,Pulsar的做法挺有意思,跟 Kafka 不一样的是,对于流入 Pulsar 系统的事件消息,Pulsar 都掖着藏着自己管着,不用额外的 ETL 过程转走了,对于较早的消息则通过内置服务来转储为 Parquet 这种列式文件,然后利用现有的计算引擎比如 Spark,Hive 或 Presto 做分析;对应分析 job 同时作为一个消息的消费者来拉取最新流入的消息,从而达到批流统一,满足实时性。Kudu 的做法有些类似,不过实现上更紧凑,记录是直接以行式写到内存上的 B 树,然后再异步刷到磁盘;后台再对这些增量数据进行合并,写列式存储文件。这个时候如果启动 Impala 查询,就会高效读取到这些列式存储文件,但是对于还没有合并的行式增量更新文件,则只有全部读进来自己合并得到完整结果。如果还在内存里面呢?更甚者,如果还在分布式一致性写入协议过程中呢,也就是说还没有 committed?我怀疑类似 Pulsar 和 Kudu 的这类系统可能会处理不好,毕竟分析引擎和过程是另外的,跟存储系统分开了,总不能侵入式读取写入节点的内存吧。因此从本质上来讲,这类系统因为分析时可能读取不到最新更新操作,可能并不算是最严格意义上的 HTAP。不过在开源大数据领域,在原有的海量数据分析上面支持更新操作,能够按照版本或者时间支持 snapshot isolation 一致性地读,已经是个很大的进步了,毕竟主要还是解决大数据场景问题。与此相对应,蚂蚁的 OceanBase 号称是金融级别的,已经有大量 case 用于银行金融业务,因为是一个从存储到查询都高度一体化的系统,从理论和工程上来讲可以做到分析统计更实时更精确。类似于 SQL Server,OceanBase 首先是一个 OLTP 数据库,因此无论是查询还是更新操作,无论是点查还是范围查询,都要求很高性能,因此在存储设计上跟上述大数据背景的系统有很大区别,不过也还是遵循了 Delta 背后的类似工程思路,就是读和写走不同的优化路径,数据分为基线存储和内存事务增量存储,增量存储最终合并进入基线存储。具体的区别在于,基线存储采取行式而非列式,做法上跟 HBase 有不少类似,单位存储块内的行集排好序,并支持创建 bloom filter 等索引,有利于点查和范围查询。内存事务写入上,delta 数据记录为一个操作链,并做好 B+ 树和哈希索引,同样为了快速点查和范围查询。这些装置当然主要是对 OLTP 事务操作和查询来准备的,OLAP 类的报表统计查询应该同样可以利用。有意思的是,OceanBase 的存储块比较小,十分有利于做合并,大多数块在合并过程中可以保持不变,即使合并需要重新生成块涉及的数据量也不大。大数据存储系统动则几十上百兆的块,比如 HDFS 默认 128MB,很难想象仅仅为了合并写入几条更新记录,重写这么大的数据块。扯了这么远,回到数砖即将开源的 Delta 上来,它是如何实现高效合并的?我在会上专门问过 Renold,回答说是可以参数控制,不过还是有点不太清楚,搞明白的同学麻烦分享一下,我也想知道。笔者所在的团队正在做类似的事情,因为是一个全新的存储系统,在一开始我们就把 Delta 考虑进来,单位存储块不大也不小,在合并更新写入上和吞吐上走了一个平衡。对这方面工作感兴趣的同学可以加好友交流,也欢迎加入我们,一起共创未来。
本人邮箱zhengkai.zk@alibaba-inc.com,也欢迎加入钉钉群 Apache Spark 中国技术交流群,群号 23109202;阿里云 E-MapReduce 技术交流群,群号21784001。或者扫码加入: