上一篇文章介绍了流式处理的基本概念,并提出了窗口的概念的由来——即为了将无限数据切分为有限来处理。
这篇文章主要通过以下四个问题逐渐勾勒出流式处理的计算框架:
- What: 要计算什么结果?——转换(transform)
- Where: 在哪个位置计算结果?——窗口(window)
- When: 在处理时间的哪个时刻计算窗口的结果?——水位线(watermark) + 触发器(trigger)
- How: 如何修正之前计算出的结果?——丢弃、累积、累积&撤回
其中水位线和触发器的概念的产生最让我觉得很自然,是很容易便想到要引入用来方便处理流式问题的方法论。下面就此说一说。
在上篇文章中我们引入了窗口的概念,是为了化无限为有限。现在我们已经有了窗口了,下面需要知道何时把窗口计算结果发往下游,即我们要确定窗口所包含的所有数据都已经到达。
举例来说,有个在线视频 APP,用户可以使用离线观看模式。如果有人把他们的移动设备带到野外,系统根本没有办法知道他们何时会回到有网络连接的地带,然后开始上传他们在没有网络连接时观看视频的数据。
从这个例子来说,在历史数据到达之前我们是无法判断数据是否绝对已经到达的,但是我们仍然需要一些建立一些语义帮助我们来做出判断,否则永远触发不了计算。
这里说的其实就是窗口的完整性的问题,也就很自然地引入了水位线的概念。水位线的到达意味着小于水位线的数据全部到达。
尽管在传统批处理系统中不存在水位线的概念,但是在语义上我们仍然可以引入它。批处理的水位线刚开始时一直停留不动。直到系统收到了所有数据并开始处理,水位线近似平行于事件发生时间轴开始平移,然后一直延伸到无穷远处。这也是批流统一的一个理论基础。
然而水位线是对于数据完整性的一个猜测,即启发式估计,仅仅靠它还是不够的。
因为水位线对数据处理的准确性有两个主要的影响
\1. 水位线可能设置的过早,因此在水位标记达到后仍然有记录到达。
\2. 水位线可能设置的过晚,而这会带来更高的延迟。
由于上述的原因,我们认为光使用水位线是不够的。
从Lambda架构中我们获得了规避完整性问题的启发:它不是尽快地提供完全准确的答案,而是说,它先是尽快通过流式处理管道提供一个最佳的低延迟估计,同时承诺最终会通过批处理管道提供正确的和一致的答案(当然前提条件是批处理作业启动时,需要的数据应该已经全部到达了;如果数据后期发生了变化,那么批处理要重新执行以获得准确答案)。如果我们要在一个单一的数据处理管道里做到同样的事情(不管采用哪种执行引擎),那么我们需要一种对任一窗口能够提供多种答案(或者可以叫做窗格,译者注:对窗口这个比喻的引申)的方式。我们把这种功能叫做“触发器”。这种“触发器”可以选择在何时触发指定窗口的输出结果。
这又很自然地引入了触发器的概念,这里的启发居然是来自 Lambda 架构的思想,而且确实能够对比,真是让人叫绝。对 Lambda 架构细节不清楚想具体了解的可以看看《大数据系统构建》这本书。
从这里可以看出来——
触发器对窗口模型是互补的,各自从不同的时间维度上影响系统的行为:
\1. 窗口决定哪些事件发生时间段的数据被分到一组;
\2. 触发器决定在什么时候进行操作。
对于我们为了追求低延迟而提前计算出来的结果,在后期延迟数据的到来,我们可能由于触发器再次出发计算,那么对于结果该如何修正呢?文中也给出了三种基本模式:丢弃、累积、累积&撤回。
最后小结一下:
这种流处理模式带来了巨大的灵活性,我们需要做的是在处理数据的各种要素之间取得平衡,如正确性,延迟和成本。
我们目前只了解了一种窗口类型:基于事件时间的固定窗口。 从Streaming 101中我们提到了多种窗口,其中有两个是今天要详细阐述的:基于处理时间的固定窗口,基于事件时间的会话窗口。
When/Where: 基于处理时间的窗口
处理时间窗口重要的原因有两个:
- 对于某些使用情况,例如使用情况监控(例如,Web服务流量的 QPS),希望在收到数据流时分析数据,处理时间窗口绝对是适当的方法。
- 对于事件发生时间很重要的(例如,分析用户行为趋势,计费,评分等)的场景,处理时间窗口绝对是错误的选择,要能够清晰的区分哪些场景合适。
因此,值得深入了解处理时间窗口和事件时间窗口之间的差异,特别是考虑到当今大多数流处理系统中广泛使用了处理时间窗口。
当使用类似于本文中提到的模型时,作为一等公民的窗口是严格基于事件时间的,有两种方法可以用来实现处理时间窗口:
- **触发器:**忽略事件时间(即,使用跨越所有事件时间的全局窗口),并使用触发器在处理时间轴中提供该窗口的快照。
- **进入时间:**将进入时间作为数据到达系统的事件时间,并从此开始使用正常的事件时间窗口。 目前 Spark Streaming 就是这么做的。
请注意,这两种方法或多或少等同,但在在多处理步骤Pipeline的情况下略有不同:在触发器版本中,每个处理步骤都使用处理时间切分窗口,步骤之间相互独立,因此例如窗口X中的数据为 一个阶段可能会在下一阶段的窗口 X-1 或 X+1 中; 在进入时间版本中,一旦将数据归于窗口X中,由于不同的处理步骤时间使用水位线同步处理进度(Dataflow的做法),在整个处理过程中都会一直属于窗口X。对微批来说( Spark Streaming的做法),微批的边界或其他因素,是在引擎级别协调处理。
正如一直强调的,处理时间窗口的最大缺点是,当输入的顺序发生变化时,窗口的内容会发生变化。 为了更具体地说明这一点,我们来看这三种用例:
- 事件时间窗口
- 使用触发器的处理时间窗口
- 使用进入时间的处理时间窗口
我们将每个窗口应用到两个不同的输入数据集(总共有6个变体)。 两个输入数据包含完全相同的事件(即相同的值,发生在相同的事件时间),但顺序不同。 第1个数据集跟我们之前例子中的顺序一致,颜色为白色;第二个数据集调整了事件的处理顺序,如下图12所示,为紫色。
图12. 改变了处理时间,其他不变
译者注:原文此处配图有误,和图17 正好错位了
基于事件时间的窗口
为了建立一个基线,我们首先将基于事件时间的使用启发式Watermark的固定窗口处理两个顺序不同的数据集。 我们将重用清单5 / 图7 中的提前/延迟处理的代码,从而得到如下结果。 左边实际上是我们以前看到的; 右边是第二个数据集的结果。 这里要注意的一点是:尽管输出的整体形状不同(由于处理时间不同),四个窗口的最终结果保持不变:14,22,3和12:
图13. 处理时间顺序不同的事件时间窗口
使用触发器的处理时间窗口
现在我们来比较上述两种处理时间方法。 首先,将尝试触发器方法。使用处理时间窗口达到效果,需要考虑以下三个方面:
- 窗口: 使用全局事件时间窗口,本质上是以事件窗格模拟处理时间窗口。.
- 触发: 根据处理时间窗口的期望大小,在处理时间维度上周期性触发。
- 累积: 使用丢弃模式来保持窗格彼此独立,从而让每个窗格都像一个独立的处理时间窗口。
相应的代码看起来像清单9; 请注意,全局窗口是默认的,因此没有具体的覆盖策略:
PCollection<KV<String, Integer>> scores = input .apply(Window.triggering( Repeatedly(AtPeriod(Duration.standardMinutes(2)))) .discardingFiredPanes()) .apply(Sum.integersPerKey());
清单9. 在全局事件窗口上使用重发触发器、丢弃模式,模拟处理时间窗口
当流处理引擎上输入两个不同顺序的数据集的时候,结果如下图14所示。关于此图有两点有点意思:
- 由于我们基于事件时间的窗格模拟处理时间窗口,所以在处理时间轴中勾画了“窗口”,这意味着窗口宽度是在Y轴上度量而不是X轴。
- 由于处理时间窗口对输入数据的顺序敏感,在两个数据集中,每个窗口包含的数据都是不同的,即时事件发生的时间相同。 在左边我们得到12,21,18,而在右边我们得到7,36,4。
图14. 处理时间顺序不同的处理时间窗口
使用进入时间的处理时间窗口
最后,我们来看看通过将输入数据的事件时间映射为入口时间来实现的处理时间窗口。在代码方面,这里有四个方面值得一提:
- **时移:**当数据到达时,数据的事件时间被入口时间(数据到达时的处理时间)覆盖。请注意,我们目前在Dataflow中没有标准API来处理时间,尽管我们接下来会可能会使用伪代码I / O源中的虚构方法来代表下面的代码。对于Google Cloud Pub / Sub,只需在发布消息时将消息的timestampLabel字段留空;对于其他来源,需要查阅源代码文档。
- 窗口: 返回使用标准的固定事件时间窗口。
- 触发: 由于入口时间提供了计算理想Watermark的能力,所以可以使用默认触发器,在这种情况下,当Watermark通过窗口的末尾时,触发器会隐式触发一次。
- **累积模式:**由于我们每个窗口只有一个输出,所以累积模式是无关紧要的。
实际的代码可能是这样:
PCollection<String> raw = IO.read().withIngressTimeAsTimestamp(); PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn()); PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))) .apply(Sum.integersPerKey());
清单10. 显式设置默认触发器
在流式引擎上的执行将如下面的图15所示。当数据到达时,它们的事件时间被覆盖为它们的进入时间(即到达时的处理时间),导致在理想Watermark线上的向右水平移位。该图中有趣的注释:
- 与其他处理时间窗口示例一样,当输入的顺序变化时,即使输入的值和事件时间保持不变,我们也会得到不同的结果。
- 与其他示例不同,窗口在事件时间维度上(因此沿X轴)重新划分了。尽管如此,这些窗口并不是原生的事件时间窗口;而是我们将处理时间简单地映射到事件时间上,擦除每个输入的原始记录,并用新的记录代替它,而事件的时间是表示Pipeline首次收到到数据的时间。
- 尽管如此,由于使用了Watermark,触发器仍然在与之前的处理时间示例完全相同的时间触发。此外,所产生的输出值与该示例相同,如左侧的12,21,18,右侧的7,36,4。
- 由于使用入口时间,所以理想的Watermark是可能的,所以实际的Watermark与理想的Watermark相匹配,斜率为1,向右上方延伸。
图15. 使用入口时间的处理时间窗口,处理两个内容一样但顺序不同的数据集
虽然看到不同的方法可以实现处理时间窗口很有趣,但是这里的大部分内容是自从第一篇文章以来一直提到的:事件时间窗口与顺序无关,至少在极限情况下如此(实际 在处理过程中的窗格可能会不同,直到输入完成),而处理时间窗口不是。 如果关心事件实际发生的时间,必须使用事件时间窗口,否则计算结果是无意义的。
Where**: 会话窗口**
现在来看一下我最喜欢的特性之一:动态的、数据驱动的窗口,称为会话窗口。
会话是一种特殊类型的窗口,它捕获数据中的一段活动,在不活动一段时间后窗口中止。 它们在数据分析中特别有用,因为可以让我们看到某一个特定用户在一段时间内的行为。 这可以让我们分析会话内的活动的相关性,基于会话的长度来推断用户的参与水平等。
从窗口的角度来看,会话窗口在两个方面很有趣:
- 它们是数据驱动窗口的示例:窗口的位置和大小是输入数据本身来决定,而不是在时间内基于某些预定义模式,如固定和滑动窗口。
- 它们也是不对齐窗口的示例,即窗口并不将数据一视同仁,而是将数据的特定子集(例如,每个用户)进行切分。 这与对齐的窗口(如固定和滑动窗口)形成对比,这些窗口通常对数据一视同仁,进行切分。
对于一些用例,可以提前在一个会话中的数据中标记一个共同标识符(例如,在线的的视频播放器,定时发出心跳包,心跳包内容是服务质量信息,对于任何给定的一次观看,分配一个会话ID,所有的心跳信息中都添加这个会话ID)。在这种情况下,会话更容易构建(按照会话ID区分会话),本质上是按键分组的一种形式。
然而,在更一般的情况下(即,实际会话提前并不知道),会话只能从从数据中构建出来。当处理无序数据时,这变得特别棘手。
提供一般会话支持的关键是,根据定义,完整的会话窗口是一组较小的重叠窗口的组合,每个窗口包含单个记录,每个记录中的每个记录与下一个记录的间隔不超过预先定义的间隔。因此,即使会话中的数据乱序了,也可以简单地通过将各个数据的重叠窗口合并在一起来构建最终会话。
图16. 未合并的原始会话窗口和合并之后的会话窗口
下面来看一个代码示例,以清单8中的代码为基础,修改为使用会话窗口:
PCollection<KV<String, Integer>> scores = input .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))) .triggering( AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());
清单11. 基于会话窗口,提前和延迟触发,使用累加和撤销模式
在流处理引擎上执行如下所示:
图17. 基于会话窗口,提前和延迟触发,使用累加和撤销模式
上图中的具体过程如下:
- 当遇到具有值为5的第一个记录时,它被放置在从该记录的事件时间开始的单个原始会话窗口中,窗口宽度为会话窗口的超时时长,例如超时时长为1分钟,会话窗口宽度为1分钟。在后边遇到的任何窗口与该窗口重叠的都应该隶属于同一个会话,并且合并到此窗口中。
- 第二个到达记录是7,它类似地放在自己的原始会话窗口中,因为它不与5的窗口重叠。
- 同时,Watermark已经过第一个窗口的末尾,所以在12:06之前,包含值5的窗口被物化为准时的窗口。此后不久,当处理时间正好为12:06的时候,第二个窗口也被物化为具有值7的推测结果。
- 我们接下来观察一系列的记录,3,4和3,这3个会话窗口相互重叠。因此,它们都被合并在一起,并且在12:07的时候提前触发,发出一个值为10的单个窗口。
- 当8到达不久之后,它与具有值7的会话和与值10的会话重叠。所有这三个因此被合并在一起,形成具有值25的新的组合会话。当Watermark然后通过这个会话的末尾时,它物化了一个包含值25的新会话以及之前发布的两个窗口的撤消,但后来被并入它:7和10。
- 当9到达延迟到达时,类似的舞蹈发生在9号晚上,与值为5的原始会话,和值为25的会话变成了一个更大的值为39的一个较大的会话。值39和窗口25、5的撤销被立即延迟触发。
就这么简单地将流处理模型分解为不同的、可组合的部分,这还真是了不起啊。至此,你可以将注意力放在业务逻辑上了,而非那些数据形式的细节。
在您离开之前,我想快速回顾一遍,防止您忘了。首先,我提到了几个重要的概念:
- 事件时间与处理时间:事件发生时间和被数据处理系统处理的时间之间的重要区别。
- 窗口:通常使用的方法是通过在时间边界(通过处理时间或事件时间)对其进行切分来管理无限数据,尽管我们将数据流模型中的窗口定义缩小仅表示事件时间内)。
- 水位线:事件时间进度的概念,为在无限数据上运行的乱序处理系统提供了估计窗口数据完整性的手段。
- 触发器:用于精确指定在合适计算窗口结果的机制,对于特定用例是有意义的。
- 累积:在单个窗口被多次触发计算的情况下,随着触发持续的修正窗口结果。
其次,我们用来构建我们探索的四个问题:
- What 要计算出什么结果?= 转换
- Where 事件在哪里结果计算? = 窗口
- When 在处理时间维度上什么时候计算窗口结果? = 水位线 + 触发器
- How 如何不断的修正计算结果?= 累积
第三,最后一点,这种流处理模式所带来的灵活性(最终,需要做的是在处理数据的各种要素之间取得平衡,如正确性,延迟和成本),回顾一下,通过少量的代码修改,对相同的数据集处理而得到的输出的变化如下:
传统批处理 清单 1 / 图 2
固定窗口的批处理 清单 2 / 图 4
固定窗口的流式处理及水位线 清单 2 / 图 6
提前/延迟的丢弃模式 清单 7 / 图 9
提前/延迟的累积模式 清单 4 &amp; 5 / 图 7
提前/延迟的撤回模式 清单 8 / 图 10
处理时间窗口(触发器) 清单 9 / 图 14
处理时间窗口(进入时间)清单 10 / 图 15
会话窗口 清单 11 / 图 17