DataFlow编程模型与Spark Structured streaming

流式(streaming)和批量( batch):流式数据,实际上更准确的说法应该是unbounded data(processing),也就是无边界的连续的数据的处理;对应的批量计算,更准确的说法是bounded data(processing),亦即有明确边界的数据的处理。 近年来流式计算框架编程接口的标准化傻瓜化SQL化日渐有走上台面的趋势。各家计算框架都开始认真考虑相关的问题,俨然成为大家竞争的热点方向。

Dataflow模型:是谷歌在处理无边界数据的实践中,总结的一套SDK级别的解决方案,其目标是做到在非有序的,无边界的海量数据上,基于事件时间进行运算,并能根据数据自身的属性进行window操作,同时数据处理过程的正确性,延迟,代价可根据需求进行灵活的调整配置。DataFlow的底层计算引擎依托于 Millwheel 实时计算框架和FlumeJava批处理框架,在谷歌开源了相关SDK以后,发起了beam项目: http://beam.incubator.apache.org/ , 为了拉拢开源社区的同学,其底层计算引擎也可以替换适配成Spark/Flink等开源计算框架。

DataFlow模型核心


Dataflow计算模型:希望从编程模型的源头上,统一解决传统的流式和批量这两种计算语意所希望处理的问题。

和Spark通过micro batch模型来处理Streaming场景的出发点不同,Dataflow认为batch的处理模式只是streaming处理模式的一个子集。在无边界数据集的处理过程中,要及时产出数据结果,无限等待显然是不可能的,所以必然需要对要处理的数据划定一个窗口区间,从而对数据及时的进行分段处理和产出,而各种处理模式(stream,micro batch,session,batch),本质上,只是窗口的大小不同,窗口的划分方式不同而已

比如,Batch的处理模式就只是一个窗口区间涵盖了整个有边界的数据集这样的一种特例场景而已。一个设计良好的能处理无边界数据集的系统,完全能在准确性和正确性上做到和“Batch”系统一样甚至应该更好。而不是传统的认为batch框架的正确性更好,streaming框架顾及了实时性,正确性天然就做不好,必须和batch框架配合走Lambda模型来补足(Lambda模型:用一个流式+批量的拼凑方案去解决海量无限数据的实时统计问题,虽然有各种上层封装抽象,统一SDK编程接口方案的存在,企图通过一套代码,翻译执行的方式,降低在两套计算框架模型上开发和维护代码的代价,但实际效果往往并不如意,翻译执行层的存在,并不能抹平两种计算框架在模型根源上的差异,到头来真正能复用的代码逻辑并不多,简单的说就是Lambda框架本身并不解决用户真正的痛点,而只是一种没有出路的情况下的无奈之举)。

Dataflow模型里强调的两个时间概念:Event timeProcess time

  • Event time 事件时间: 就是数据真正发生的时间,比如用户浏览了一个页面,或者下了一个订单等等,这时候通常就会有一些数据会被生产出来,比如前者可能会产生一条用户的浏览日志
  • Process time: 则是这条日志数据真正到达计算框架中被处理的时间点,简单的说,就是你的程序是什么时候读到这条日志的

现实情况下,由于各种原因,数据采集,传输到达处理系统的时间可能会有长短不同的延迟,在分布式应用场景环境下,不仅是延迟数据乱序到达往往也是常态。这些问题,在有边界数据集的处理过程中往往并不存在,或者无关紧要。

DataFlow编程模型与Spark Structured streaming

基于这种无边界数据集的特性,在Dataflow模型中,数据的处理过程中需要解决的问题,被概括为以下4个方面:

  • What results are being computed. : 计算逻辑是什么
  • Where in event time they are being computed. : 计算什么时候(事件时间)的数据
  • When in processing time they are materialized. : 在什么时候(处理时间)进行计算/输出
  • How earlier results relate to later refinements. : 后续数据如何影响(修正)之前的计算结果

清晰的定义这些问题,并针对性的在模型框架层面加以解决,正是Dataflow模型区别于其它流式计算模型的核心关键所在。通常的流式计算框架往往模糊或者无法有效的区别对待数据的事件时间和处理时间,对于第4个问题,如何修正数据,也可能缺乏直接的支持。这些问题通常需要开发人员在业务代码逻辑层面,自行想办法解决,因而也就加大了这类数据处理业务的开发难度,甚至让这种业务的开发成为一个不可能完成的任务。Dataflow计算模型的目标是把上述4方面的问题,用明确的语意,清晰的拆分出来,更好的模块化,从而实现在模型层面调整局部设置,就能快速适应各种业务逻辑的开发需求。Spark 2.0开始启动的Structure Streaming API也引入了和Dataflow类似的模型思想。

理论模型:Dataflow基本上是通过构建以下三个核心功能模型来解决上面的问题:

  • 一个支持基于事件时间的窗口(window)模型,并提供简易的API接口:支持固定窗口/滑动窗口/Session(以Key为维度,基于事件时间连续性进行划分)等窗口模式
  • 一个和数据自身特性绑定的计算结果输出触发模型,并提供灵活可描述的API接口
  • 一个增量更新模型,可以将数据增量更新的能力融合进上述窗口和结果触发模型中。

窗口模型:

DataFlow编程模型与Spark Structured streaming

为了在计算框架级别实现基于事件时间的窗口模型,Dataflow系统中,将常见的流式计算框架中的[key,value]两元组tuple形式的信息数据,变换成了[key,value, event time, window ]这样的四元组模型。

Event time的引入原因显而易见,必须要有相关载体承载这个信息,否则只能基于Process time/Batch time 来划分窗口。而window窗口标识信息的引入,很重要的一个原因是要支持Session类型的窗口模型,而同时,要将流式和增量更新的支持融合进窗口的概念中,也势必需要在数据中引入这样一个显式的窗口信息(否则,通常的做法就只能是用micro batch分组数据的方式,隐式的标识数据的窗口属性),在消息的四元组数据结构基础上,Dataflow通过提供对消息进行窗口赋值窗口合并按key分组按窗口分组等原子功能操作,来实现各种窗口模型。

窗口触发模型:

多数基于Process time定义的固定窗口或滑动窗口模型,并没有特别强调窗口触发这样一个概念,因为在这类模型中,窗口的边界时间点,也就是触发计算结果输出的时间点,并不需要特别加以区分。

对于Dataflow这样的基于事件时间的模型来说,由于事件时间和处理时间之间存在非固定的延迟,而框架又需要正确的处理乱序的数据,这使得判断窗口的边界位置,进而触发计算和结果输出变得困难起来。在这一点上,Dataflow部分借用了底层Millwheel提供的Low watermark低水位这样一个概念来解决窗口边界的判断问题,当低水位对应的时间点超过设定的时间窗口边界时间点时,触发窗口的计算和结果输出。但是,低水位的概念在理论上虽然是OK的,在实际场景中,通常是一个概率模型,并不能完全保证准确的判断事件时间的延迟情况,而且有很多场合对窗口边界的判断,用户自己有自己的需求。因此,Dataflow提供了可自定义的窗口触发模型可以使用低水位做触发,也可以使用比如:定时触发,计数触发,计量触发,模式匹配触发或其它外部触发源,甚至各种触发条件的逻辑运算组合等机制来应对可能的需求。

增量更新模型:

当一个特定时间窗口被触发以后,后续晚到的数据如何处理,如何对之前触发结算的结果进行修正,Dataflow在框架层面也提供了直接的支持,基本上包括三种策略:

  • 丢弃:一旦特定窗口触发过,对应窗口的数据就丢弃,晚到的数据也丢弃
  • 累计:触发过的窗口对应的数据保留(保留时间策略也可调整),晚到的数据更新对应窗口的输出结果
  • 累计并更正:和累积模式类似,区别在于会先对上一次窗口触发的结果发送一个反相修正的信息,再输出新的结果,便于有需要的下游更正之前收到的信息

通常来说,丢弃策略实现起来最简单,既没有历史数据负担,对下游计算也不产生影响。但是前提条件是,数据乱序或者晚到的情况不严重或者不重要或者不影响最后的统计结果的精度。

累计策略,从窗口自身的角度来说,实现起来也不复杂,除了内存代价会高一些,因为要保留历史窗口的数据,但是存在的问题是有些下游运算逻辑是基于上游运算结果计算的,下游计算逻辑能否正确处理重复输出的窗口结果,正确的进行去重或者累加,往往是个问题。

累计并更正策略,就窗口自身逻辑来说,实现上会更加复杂一点,但是下游计算逻辑的编写复杂性其实才是最难的。反相修正信息,是为了给下游提供更多的信息来解决上述窗口运算结果重复输出问题,增加了下游链路去重数据的能力,但实际上,这个逻辑需要下游计算逻辑的深度配合才能实现,个人觉得,除了部分计算拓扑逻辑相对简单的程序能够正确处理好这种情况,依赖关系稍微复杂一点的计算链路,靠反相修正信息,要做到正确的累加或去重还是很困难的。

Structured Streaming


Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL)。Structured Streaming顾名思义,它将数据源和计算结果都映射成一张”结构化”的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率

Spark2.0之前,流式计算通过Spark Streaming进行:

DataFlow编程模型与Spark Structured streaming

使用Spark Streaming每次只能消费当前批次内的数据,当然可以通过window操作,消费过去一段时间(多个批次)内的数据。在数据量特别大的情况下,使用window操作并不是很好的选择,通常是借助其它如Redis、HBase等完成数据统计

Structured Streaming将数据源和计算结果都看做是无限大的表,数据源中每个批次的数据,经过计算,都添加到结果表中作为行。

DataFlow编程模型与Spark Structured streaming

关于结算结果的输出,有三种模式:

  • Complete Mode:输出最新的完整的结果表数据。
  • Append Mode:只输出结果表中本批次新增的数据,其实也就是本批次中的数据;
  • Update Mode(暂不支持):只输出结果表中被本批次修改的数据;注意,这与完全模式不同,因为此模式不输出未更改的行。

新增的structured streaming API,针对原先的streaming编程接口DStream的问题进行了改进,Dstream的问题包括:

  • 框架自身只能针对Batch time进行处理,很难处理event time,很难处理延迟,乱序的数据  (batch interval:为Streaming应用设置的批处理间隔)
  • 流式和批量处理的API还是不完全一致,两种使用场景中,程序代码还是需要一定的转换
  • 端到端的数据容错保障逻辑需要用户自己小心构建,增量更新和持久化存储等一致性问题处理难度较大

这些问题其实也就是Dataflow中明确定位需要解决的问题。通过Structured Streaming API,Spark计划支持和Dataflow类似的概念,如Event time based的窗口策略,自定义的触发逻辑,对输出(sink)模块的更新模式(追加,全量覆盖,更新)的built-in支持,更加统一的处理无边界数据和有边界数据等。

总体看来,Spark 2.x的structured streaming 模型和Dataflow有异曲同工之处,设计的目标看起来很远大,甚至给出了一份功能比较表格来证明其优越性

DataFlow编程模型与Spark Structured streaming

但上面的表格明显的是有“扬长避短”的偏向性的。比如在2.1的版本中,Structured Streaming还是Alpha版的,所支持的类Dataflow模型的功能还相对简单。2.2版本中,号称production了,不过,应该还是从稳定性的角度来说的,功能完整性方面还有一定差距。比如还不支持session window,追加模式更新只能支持无聚合操作的场景,还有各种功能还停留在设想阶段,对于join等操作还有各种各样的限制等等,这些部分和dataflow业已实现的功能还有较大的差距。

对于exactly once发送的保障,Structured Streaming要求外部数据源具备offset定位的能力,再加上snapshot等机制来实现,而dataflow是通过对消息在框架内部进行持久化来实现replay,不依赖外部数据源的能力。

另外,prefix integrity, Transactional sink等概念,实际上是对上下游读写接口的一个封装,帮用户实现了一些业务逻辑,整体上偏外围功能一点,用这些特性来和其它框架比较不一定客观,因为设计理念不太不一样。

而在Dataflow的模型设计中,用户能更加细化的定义每个环节的步骤和设置,所以没有把一些逻辑替用户实现,更多的是以模块化的方式,留给用户去自己选择,而Structured steaming则把很多事情包办了,定制的余地较小,灵活性应该会差一些,不过这也给程序的自动优化带来了一些便利

Beam


     Beam https://github.com/apache/beam 是由谷歌发起的apache 项目,基本来说就是实现dataflow编程模型的SDK项目,目标是提供一个high level的统一API编程接口,后端的执行引擎支持对接 APEX/Spark/Flink/Cloud dataflow

目前的编程语言支持Java和Python,2017年5月发布了第一个稳定版本2.0.0。

这个项目的前景如何,不太好说,单就适配各个后端的角度来说,就Spark后端来说,在spark 1.x时代,这种high level的编程模型抽象是对spark编程模型的一种add on,有一定的附加价值,但是按照spark 2.x structured streaming的发展路线来说,这一层抽象就稍微显得有些多余了。而基于Java的语法,在表达的简洁性上,相比scala也会带来一些额外的代价。

Flink


Dataflow的核心就是窗口和触发模型,而Flink在这两方面的实现,最接近Dataflow的理论原型,事件时间驱动,各种窗口模型,自定义触发和乱序/晚到数据的处理等等。

Flink的Data Streaming API通过定义window方法,和window内的数据需要使用的聚合函数比如:reduce,fold,window(前两者增量,后者全量),以及窗口触发(Trigger)和窗口内数据的淘汰(Evictor)方法,让用户可以实现对Dataflow模型中定义的场景的灵活处置,比如:需要在大数据量,大窗口尺度内实现实时连续输出结果的目的。通过allow late数据的时间范围来处理晚到数据。对于延迟数据会触发聚合结果的再次输出,这个和Dataflow的模型不同的是,Flink本身是不提供反向信息输出的,需要业务逻辑自行做必要的去重处理。对于Flink的实现,对数据的聚合和淘汰方式,给用户留下了足够灵活的选择,毕竟在工程实践中,长时间,大窗口,连续结果输出这种场景很常见,比如实时统计一天之类各个小时段的PV/UV,5秒更新一次结果。这种情况下,要避免OOM,还要正确处理晚到数据,追数据等问题,预聚合和提前触发的能力就必不可少了。

至于SQL化这条路,Flink的SQL语法解析和优化是依赖Apache Calcite实现的,而Calcite对window语法的支持才刚刚开始,所以FlinkSQL目前还不支持Streaming模型

整体感觉Flink目前在Dataflow模型思想方面实现的成熟度比Spark Structured Streaming要好

StreamCQL


华为的StreamCQL方案,是构建在Storm之上的,简单的说就是提供了一个流式SQL的编程接口,执行时,底层翻译成Storm的拓扑逻辑提交执行。整体上,StreamCQL做的好的地方是,SQL的支持比较完整,其它框架,在Stream这个场景,SQL的支持,或多或少还在开发完善中。

StreamCQL最大的问题,是它的编程模型,和Dataflow的模型还有很大的差距。

整体上来说,StreamCQL的框架逻辑,就是使用窗口来buffer一部分数据,然后当窗口结束条件满足时,释放出这批数据给下游触发一次计算流程。粗看和Dataflow没有太大的区别,但实际上,最主要的差距,是StreamCQL对窗口模型的定义,其次是触发和数据更新模型的缺失。

StreamCQL的窗口模型,支持Batch(也就是固定间隔窗口)和Slide,但是窗口的划分默认是基于处理时间Process Time的!!!而且,虽然窗口内的数据可以再细分Partition,但窗口只有一个。。。不能同时处理几个窗口,意味着无法处理数据乱序或者晚到的情况。而Slide窗口的定义,也和主流的Slide窗口定义不同,每次对下游更新离开窗口范围的数据,看起来更像一个FIFO Queue的实现。

尽管可以使用Trigger关键字,将Batch窗口的触发条件改为消息中的某一个字段或者表达式,从而通过指定事件时间字段,近似的达到基于事件时间的窗口划分。但是,实际上,因为单一的窗口机制,这样做,也只能处理事件源严格递增的场景。而现实情况中,来自不同客户端的事件,时间必然是乱序的,实时流计算的来源也主要是分布式消息队列(如kafka),进一步导致全局的无序,所以现实中,基本是不可能存在消息中事件时间严格递增的场景。

此外,由于缺乏灵活的数据更新和淘汰方式的定义,StreamCQL的主流程基本上是Buffer一堆数据,然后计算加淘汰这批数据,所以,缺乏数据预聚合的能力,这就导致窗口范围内所有的数据在窗口关闭之前,都必须保存在内存中。因此即使是事件时间严格递增或者只关心Process Time的场景,Window的范围也不能太大,否则很容易超过内存限制,造成OOM,而实际上,多数场景,只需要保留增量聚合后的结果数据就足够了。

总体来说,StreamCQL的SQL语法比较完善,但计算模型在理论和架构实现方面存在较大的不足,所以如果不加改造,在实际工程应用中很难有大的做为。

参考学习资料:


上一篇:从零开始学 Web 之 Vue.js(四)Vue的Ajax请求和跨域


下一篇:Android Loader详解四:回调及完整例子