作者 | Phil Fried
译者 | 弯月 责编 | 王晓曼
出品 | CSDN(ID:CSDNnews)
时至 2021 年,估计很多人听到 MapReduce 都不为所动了。毕竟,MapReduce 问世已经十多年了,虽然在当时引起了轰动,但那已经是过去的事情了。
但是 MapReduce 非常重要,而且即使是现在,它也能提供很大的价值。如果能给这个久经沙场的框架添加新功能,就能克服许多痛点,而且还能获益匪浅。
MapReduce的过去与现状
MapReduce 出现于 2004 年发表的一片论文,描述了 Google 采用的一种利用巨大的集群来实现大规模分析的方法。到 2007 年, 风靡一时的 MapReduce 引擎 Apache Hadoop 开始了它的传奇之旅。对于许多人而言,MapReduce 就是 Hadoop。然而,Hadoop 只不过是一个实现而已。还有好多系统,如 CouchBase、MongoDB 都采用了 MapReduce 作为查询引擎。
Hadoop 在当时掀起了轩然大波,但现在人们已经见怪不怪了。这不仅因为 MapReduce 本身已经过时,而且当时试图解决的问题,现在却是人们尽量回避的问题。如果你想在大型集群上进行所有处理,并且数据已经位于集群中,那么 Hadoop 是一个很好的选择。当然,集群的数据输入和输出是个大问题,特别是系统没有专门为输入输出设计的情况下。
但是,传统的 MapReduce 所面临的真正问题在于,它是一个“批处理”处理框架,而且很可能并不是你想要的框架。使用 MapReduce 的基本前提是,必须先将“所有”的数据放入 Hadoop 集群,然后才能运行作业,针对所有数据执行分布式计算,并给出结果。
但是并没有所谓的“所有”数据。新的数据不断在生产,旧的数据也在不断更新。“所有数据”只不过是“到目前为止已知的所有数据”。所以,Hadoop 能给出的最佳结果也不过是在作业运行时已知的事实上得出的,而这段延迟可能非常重要。
当出现新数据或数据被更新时,你需要首先与集群中的已知数据进行协调,然后重新运行作业,获得更新后的结果。数据越多,执行所需的时间就越长,所以随着历史数据不断增长,作业也会变得越来越慢。
但这并不意味着我们应该放弃 MapReduce,只不过我们需要在批处理的基础之上再引入别的处理。
MapReduce在处理实时数据时的优势
为了最大限度地利用数据,处理结果应该尽可能及时,用户也期待能获得即时的信息。也许 Hadoop 并不是解决数据问题的唯一方案(本来也不是),但当初 MapReduce 拥有的优势,如今仍然能让它在实时数据处理中占据一席之地。
- 这种编程模型非常便于水平扩展。此外,添加节点非常容易,不需要改变数据的存储方式。这意味着它能够处理不断大量增加的数据。
- 它的错误处理模型非常简单且健壮,这对于大规模运营非常重要。如果无法轻易地从某个节点的错误中恢复,那么即使能在 100个节点上并行执行,也毫无用处。毕竟,节点越多,出错的可能性就越大。
- 归约函数是纯函数,因此可以在任何时候、任何地方运行,这就提供了相当大的灵活性。归约可以采用饥渴方式或懒惰方式,可以在同一节点上,也可以在不同节点上。只要能用同样的相对顺序提供同样的输入,就能获得同样的结果。
在需要对实体的长时间更新进行归约时,归约函数的灵活性就能发挥优势。假设有一个温度传感器,能产生读数,你希望计算最低温度、最高温度和平均温度。任何 MapReduce 实现都可以轻松完成。
下面是其工作原理。
假设输入的记录如下所示:
{“sensorId”:“A”, “timestamp”:“2021-01-05T09:57:00Z”, “tempC”: 10.7}
映射函数如下伪代码:
function map(reading) {
emit(reading.sensorId, {“timestamp”:reading.timestamp,
“min”:reading.tempC, “max”:reading.tempC, “current”:tempC, “total”:
reading.tempC, “count”:1})}
为什么 min、max 和 total 直接采用了输入的 tempC 呢?这是因为这些值会在与其他值进行归约时被更新。归约函数如下所示:
function reduce(left, right) {
let result = {
"min": math.min(left.min,right.min),
"max": math.max(left.max,right.max),
"total": left.total+ right.total,
"count": left.count+ right.count
}
if (right.timestamp.isAfter(left.timestamp)) {
result.current = right.current
result.timestamp = right.timestamp
} else {
result.current = left.current
result.timestamp = left.timestamp
}
return result}
假设你有一系列的传感器读数:
{“sensorId”:“A”, “timestamp”:“2021-01-05T09:57:00Z”, “tempC”: 10.7}
{“sensorId”:“A”, “timestamp”:“2021-01-05T09:58:00Z”, “tempC”: 10.8}
{“sensorId”:“A”, “timestamp”:“2021-01-05T09:59:00Z”, “tempC”: 10.8}
{“sensorId”:“A”, “timestamp”:“2021-01-05T10:00:00Z”, “tempC”: 11.0}
对映射后的结果进行归约的方式有很多,比如:
reduce(mapped[0],reduce(mapped[1], reduce(mapped[2], mapped[3]))).
或者这样做:
reduce(reduce(mapped[0],mapped[1]), reduce(mapped[2], mapped[3]))
不论哪种方法,都能获得同样的结果。将传感器的所有读数传递给归约函数之后,最后的输出就是最终结果。
归约通常是增量进行的,即使在 Hadoop 中也是如此。假设计算通过多个进程并行执行。每个进程对它负责的部分值进行归约。每个进程的结果都会与其他进程的结果再次进行归约运算,直到产生最终结果。
有趣的是,归约本质上是增量的。在 Hadoop 这样的批处理系统中,归约的这个性质被用来增量地聚合来自多个进程的并行计算结果。但如果将输入数据解释为某个实体在很长时间内的更新,那么归约就更有用了。
在上述示例中,实体就是传感器,每个实体由唯一的 id 确定。每个读数可以认为是传感器在一段时间内的一次更新。你也可以让每个传感器自己进行一些归约,以减少网络调用(或避免由于网络连接中断而造成数据丢失)。或者,数据也可以在边缘节点上归约,只将聚合后的数据发送到数据仓库。更通用、更现实的场景则是删除和低价值的旧数据,只保留归约后的值。归约后的值可以在新数据出现后继续使用。所以,作业可以采用如下形式:reduce(prevResults, reduce(newData…))。
为什么采用 MapReduce?
MapReduce可以持久保存并重用中间数据。但为什么要这样做呢?
简单来说,这样可以删除原始数据。对于像购买数据等,这可能并没有什么意义,因为你希望这类数据能长期保留。但这种值得长期保存的数据毕竟是例外,不是常态。
与五年前或十年前相比,如今的各个组织拥有太多的数据,但绝大部分都不是像购买记录这种高价值的数据。例如,网站的点击数据也许在短期内非常有用的,但有必要保留几年前的每次点击吗?尽管存储越来越便宜,但并不是免费的。这种数据只在非常短的时间内有用,而且很快就失去了价值。也许短期内你需要保留元数据,这样就可以分析某个特定的用户交互案例,但长期来看,你需要保存的是每个用户的聚合结果。
另一个原因是,数据流水线变得越来越复杂。这加剧了存储的问题。如果数据流水线需要从多个不同的数据源拉取数据,那么保存每个数据源的原始数据就不太现实了。还有 GDPR 之类的法令,限制原始数据的保存与访问。你只能保存匿名数据的聚合结果,因此流水线中至少要多一个步骤。
想象一下,一个作业负责将原始数据处理成中间聚合形式,每当源数据更新时都执行这个作业(例如每天一次),来处理新的数据,并将其添加到中间结果。然后,另一个作业消费该中间结果,产生更复杂的聚合。这种形式实际上非常适合 MapReduce 程序。Hadoop 很久以前就支持这种作业组合形式,即一个作业的输出作为另一个作业的输入使用。
但是,采用其他方式会遇到很多限制。举一个例子,中间结果的聚合只能在特定时间进行,比如每天进行。所以,每天都要拿出所有数据,输出每日的结果,然后将每日结果更新到每月、每季度甚至每年的报告中。这样做有很大的缺陷。首先,这会加剧调整与更正过程的复杂度。其次,更新最终报告的粒度也会被限制。如果每天聚合一次,就不可能看到半天的结果。
持续 MapReduce
如果能够在实时数据处理中使用 MapReduce,那就太好了。其实我们正在构建一款基于MapReduce 的下一代数据平台!基本的想法就是改变“作业”的概念,将每个MapReduce 程序看作一个持续运行、不断更新的程序。
持续 MapReduce 程序不像 Hadoop 作业那样产生输出后就退出,而是会持续地在新数据到达时实时更新其结果。这就意味着,读取最新的数据非常快,且代价非常低。而且,更新和添加数据并不需要重新处理所有的历史数据,持续 MapReduce 的计算也可以随意组合。你可以将一个计算结果作为输入送进另一个计算,这样即使是最复杂的聚合,也可以实时计算。
持续 MapReduce 的工作原理与常规的 MapReduce 基本相同,只不过它不会退出。主要的区别在于添加了一个键提取阶段,还有操作的顺序略微不一样,以方便持续执行。在常规的 MapReduce 中,映射函数会发送随机生成的键值对。因此框架需要将其打乱并发送到合适的进程,从而拥有同样键的所有值都会被送到同一个进程。在持续 MapReduce 中,键和值被提取到单独的函数中,所以更像是针对键的 MapReduce。在读取每条记录时,使用键函数将键提取出来,然后,记录被打乱并发送到适合该键的进程。然后,记录经过变换(映射)发送给某个成员,再针对变换的结果与之前同一个键的结果进行归约。
上述温度传感器的示例可以用持续 MapReduce 改写如下:
1.读取源记录:{“sensorId”: “A”,“timestamp”: “2021-01-05T09:57:00Z”, “tempC”:10.7}
2.调用key({“sensorId”: “A”,“timestamp”: “2021-01-05T09:57:00Z”, “tempC”:10.7})获得"A"
3.将记录送至"A"对应的进程
4.调用map({“sensorId”: “A”,“timestamp”: “2021-01-05T09:57:00Z”, “tempC”:10.7})并获得{“sensorId”: “A”,“timestamp”: reading.timestamp, “min”: reading.tempC,“max”: reading.tempC, “current”: tempC, “total”:reading.tempC, “count”: 1}
5.调用reduce,传入上述键“A”的规约结果与映射函数的结果。
6.每当新数据产生时,重复上述步骤。
此时你肯定会好奇,这些归约的结果是如何保存的。毕竟,如果作业持续运行、永远不退出,我们肯定不希望将数据一直保留在内存中。但问题是,键值的集合正是计算的输出,相当于 Hadoop 作业中的输出结果集合。
持续 MapReduce 的 Flow 实现实际上支持两种不同的方式来存储并获取这些结果。如果结果只是中间结果,而且刚刚被送到另一个持续 MapReduce 计算中,那么我们会将其保存到基于 RocksDB 的分区且有副本的存储中。这些数据与 MapReduce 进程共享。
但你可以将结果“材料化”到支持键值语义的外部系统中。通常,这意味着归约结果可以保存到数据库中(如PostgreSQL),或数据仓库中(如Snowflake),并在新的源数据出现时持续更新。更新时也会查询之前的结果,以便进行归约。
再次说回温度传感器的例子,这次将其材料化到 Postgres 表中。在执行映射函数后,我们会查询 Postgres 表,查找该键对应的前一个结果。如果该结果存在,则将其作为归约函数的左输入。不论如何,输出结果都会通过 update 或 inser 语句保存到 Postgres 数据库中,这个步骤会针对源数据中的每个记录持续进行。Postgres 表永远保存最新的结果,你也不用等待作业运行结束。
我们在努力构建 Flow,它可以帮你实现基于持续 MapReduce 的实时数据流水线,这样你就不需要自行管理映射、归约和键函数了。Flow 的界面还在构思中,但 Flow 变换运行时的基础依然是持续 MapReduce。
参考链接:
https://www.estuary.dev/blog/why-mapreduce-is-making-a-comeback