为什么 MapReduce 再次流行起来了?

作者 | Phil Fried

译者 | 弯月 责编 | 王晓曼

出品 | CSDN(ID:CSDNnews)

时至 2021 年,估计很多人听到 MapReduce 都不为所动了。毕竟,MapReduce 问世已经十多年了,虽然在当时引起了轰动,但那已经是过去的事情了。

但是 MapReduce 非常重要,而且即使是现在,它也能提供很大的价值。如果能给这个久经沙场的框架添加新功能,就能克服许多痛点,而且还能获益匪浅。

为什么 MapReduce 再次流行起来了?
为什么 MapReduce 再次流行起来了?

MapReduce的过去与现状

为什么 MapReduce 再次流行起来了?

MapReduce 出现于 2004 年发表的一片论文,描述了 Google 采用的一种利用巨大的集群来实现大规模分析的方法。到 2007 年, 风靡一时的 MapReduce 引擎 Apache Hadoop 开始了它的传奇之旅。对于许多人而言,MapReduce 就是 Hadoop。然而,Hadoop 只不过是一个实现而已。还有好多系统,如 CouchBase、MongoDB 都采用了 MapReduce 作为查询引擎。

Hadoop 在当时掀起了轩然大波,但现在人们已经见怪不怪了。这不仅因为 MapReduce 本身已经过时,而且当时试图解决的问题,现在却是人们尽量回避的问题。如果你想在大型集群上进行所有处理,并且数据已经位于集群中,那么 Hadoop 是一个很好的选择。当然,集群的数据输入和输出是个大问题,特别是系统没有专门为输入输出设计的情况下。

但是,传统的 MapReduce 所面临的真正问题在于,它是一个“批处理”处理框架,而且很可能并不是你想要的框架。使用 MapReduce 的基本前提是,必须先将“所有”的数据放入 Hadoop 集群,然后才能运行作业,针对所有数据执行分布式计算,并给出结果。

但是并没有所谓的“所有”数据。新的数据不断在生产,旧的数据也在不断更新。“所有数据”只不过是“到目前为止已知的所有数据”。所以,Hadoop 能给出的最佳结果也不过是在作业运行时已知的事实上得出的,而这段延迟可能非常重要。

当出现新数据或数据被更新时,你需要首先与集群中的已知数据进行协调,然后重新运行作业,获得更新后的结果。数据越多,执行所需的时间就越长,所以随着历史数据不断增长,作业也会变得越来越慢。

但这并不意味着我们应该放弃 MapReduce,只不过我们需要在批处理的基础之上再引入别的处理。

为什么 MapReduce 再次流行起来了?

MapReduce在处理实时数据时的优势

为了最大限度地利用数据,处理结果应该尽可能及时,用户也期待能获得即时的信息。也许 Hadoop 并不是解决数据问题的唯一方案(本来也不是),但当初 MapReduce 拥有的优势,如今仍然能让它在实时数据处理中占据一席之地。

  • 这种编程模型非常便于水平扩展。此外,添加节点非常容易,不需要改变数据的存储方式。这意味着它能够处理不断大量增加的数据。
  • 它的错误处理模型非常简单且健壮,这对于大规模运营非常重要。如果无法轻易地从某个节点的错误中恢复,那么即使能在 100个节点上并行执行,也毫无用处。毕竟,节点越多,出错的可能性就越大。
  • 归约函数是纯函数,因此可以在任何时候、任何地方运行,这就提供了相当大的灵活性。归约可以采用饥渴方式或懒惰方式,可以在同一节点上,也可以在不同节点上。只要能用同样的相对顺序提供同样的输入,就能获得同样的结果。

在需要对实体的长时间更新进行归约时,归约函数的灵活性就能发挥优势。假设有一个温度传感器,能产生读数,你希望计算最低温度、最高温度和平均温度。任何 MapReduce 实现都可以轻松完成。

下面是其工作原理。

假设输入的记录如下所示:

{“sensorId”:“A”, “timestamp”:“2021-01-05T09:57:00Z”, “tempC”: 10.7}

映射函数如下伪代码:

function map(reading) {

emit(reading.sensorId, {“timestamp”:reading.timestamp,
“min”:reading.tempC, “max”:reading.tempC, “current”:tempC, “total”:
reading.tempC, “count”:1})

}

为什么 min、max 和 total 直接采用了输入的 tempC 呢?这是因为这些值会在与其他值进行归约时被更新。归约函数如下所示:

  function reduce(left, right) {
    let result = {

        "min": math.min(left.min,right.min),

        "max": math.max(left.max,right.max),

        "total": left.total+ right.total,

        "count": left.count+ right.count

    }

    if (right.timestamp.isAfter(left.timestamp)) {

        result.current = right.current

        result.timestamp = right.timestamp

    } else {

        result.current = left.current

        result.timestamp = left.timestamp

    }

    return result}

假设你有一系列的传感器读数:

{“sensorId”:“A”, “timestamp”:“2021-01-05T09:57:00Z”, “tempC”: 10.7}

{“sensorId”:“A”, “timestamp”:“2021-01-05T09:58:00Z”, “tempC”: 10.8}

{“sensorId”:“A”, “timestamp”:“2021-01-05T09:59:00Z”, “tempC”: 10.8}

{“sensorId”:“A”, “timestamp”:“2021-01-05T10:00:00Z”, “tempC”: 11.0}

对映射后的结果进行归约的方式有很多,比如:

reduce(mapped[0],reduce(mapped[1], reduce(mapped[2], mapped[3]))).

或者这样做:

reduce(reduce(mapped[0],mapped[1]), reduce(mapped[2], mapped[3]))

不论哪种方法,都能获得同样的结果。将传感器的所有读数传递给归约函数之后,最后的输出就是最终结果。

归约通常是增量进行的,即使在 Hadoop 中也是如此。假设计算通过多个进程并行执行。每个进程对它负责的部分值进行归约。每个进程的结果都会与其他进程的结果再次进行归约运算,直到产生最终结果。

有趣的是,归约本质上是增量的。在 Hadoop 这样的批处理系统中,归约的这个性质被用来增量地聚合来自多个进程的并行计算结果。但如果将输入数据解释为某个实体在很长时间内的更新,那么归约就更有用了。

在上述示例中,实体就是传感器,每个实体由唯一的 id 确定。每个读数可以认为是传感器在一段时间内的一次更新。你也可以让每个传感器自己进行一些归约,以减少网络调用(或避免由于网络连接中断而造成数据丢失)。或者,数据也可以在边缘节点上归约,只将聚合后的数据发送到数据仓库。更通用、更现实的场景则是删除和低价值的旧数据,只保留归约后的值。归约后的值可以在新数据出现后继续使用。所以,作业可以采用如下形式:reduce(prevResults, reduce(newData…))。

为什么 MapReduce 再次流行起来了?

为什么采用 MapReduce?

MapReduce可以持久保存并重用中间数据。但为什么要这样做呢?

简单来说,这样可以删除原始数据。对于像购买数据等,这可能并没有什么意义,因为你希望这类数据能长期保留。但这种值得长期保存的数据毕竟是例外,不是常态。

与五年前或十年前相比,如今的各个组织拥有太多的数据,但绝大部分都不是像购买记录这种高价值的数据。例如,网站的点击数据也许在短期内非常有用的,但有必要保留几年前的每次点击吗?尽管存储越来越便宜,但并不是免费的。这种数据只在非常短的时间内有用,而且很快就失去了价值。也许短期内你需要保留元数据,这样就可以分析某个特定的用户交互案例,但长期来看,你需要保存的是每个用户的聚合结果。

另一个原因是,数据流水线变得越来越复杂。这加剧了存储的问题。如果数据流水线需要从多个不同的数据源拉取数据,那么保存每个数据源的原始数据就不太现实了。还有 GDPR 之类的法令,限制原始数据的保存与访问。你只能保存匿名数据的聚合结果,因此流水线中至少要多一个步骤。

想象一下,一个作业负责将原始数据处理成中间聚合形式,每当源数据更新时都执行这个作业(例如每天一次),来处理新的数据,并将其添加到中间结果。然后,另一个作业消费该中间结果,产生更复杂的聚合。这种形式实际上非常适合 MapReduce 程序。Hadoop 很久以前就支持这种作业组合形式,即一个作业的输出作为另一个作业的输入使用。

但是,采用其他方式会遇到很多限制。举一个例子,中间结果的聚合只能在特定时间进行,比如每天进行。所以,每天都要拿出所有数据,输出每日的结果,然后将每日结果更新到每月、每季度甚至每年的报告中。这样做有很大的缺陷。首先,这会加剧调整与更正过程的复杂度。其次,更新最终报告的粒度也会被限制。如果每天聚合一次,就不可能看到半天的结果。

为什么 MapReduce 再次流行起来了?

持续 MapReduce

如果能够在实时数据处理中使用 MapReduce,那就太好了。其实我们正在构建一款基于MapReduce 的下一代数据平台!基本的想法就是改变“作业”的概念,将每个MapReduce 程序看作一个持续运行、不断更新的程序。

持续 MapReduce 程序不像 Hadoop 作业那样产生输出后就退出,而是会持续地在新数据到达时实时更新其结果。这就意味着,读取最新的数据非常快,且代价非常低。而且,更新和添加数据并不需要重新处理所有的历史数据,持续 MapReduce 的计算也可以随意组合。你可以将一个计算结果作为输入送进另一个计算,这样即使是最复杂的聚合,也可以实时计算。

持续 MapReduce 的工作原理与常规的 MapReduce 基本相同,只不过它不会退出。主要的区别在于添加了一个键提取阶段,还有操作的顺序略微不一样,以方便持续执行。在常规的 MapReduce 中,映射函数会发送随机生成的键值对。因此框架需要将其打乱并发送到合适的进程,从而拥有同样键的所有值都会被送到同一个进程。在持续 MapReduce 中,键和值被提取到单独的函数中,所以更像是针对键的 MapReduce。在读取每条记录时,使用键函数将键提取出来,然后,记录被打乱并发送到适合该键的进程。然后,记录经过变换(映射)发送给某个成员,再针对变换的结果与之前同一个键的结果进行归约。

上述温度传感器的示例可以用持续 MapReduce 改写如下:

1.读取源记录:{“sensorId”: “A”,“timestamp”: “2021-01-05T09:57:00Z”, “tempC”:10.7}

2.调用key({“sensorId”: “A”,“timestamp”: “2021-01-05T09:57:00Z”, “tempC”:10.7})获得"A"

3.将记录送至"A"对应的进程

4.调用map({“sensorId”: “A”,“timestamp”: “2021-01-05T09:57:00Z”, “tempC”:10.7})并获得{“sensorId”: “A”,“timestamp”: reading.timestamp, “min”: reading.tempC,“max”: reading.tempC, “current”: tempC, “total”:reading.tempC, “count”: 1}

5.调用reduce,传入上述键“A”的规约结果与映射函数的结果。

6.每当新数据产生时,重复上述步骤。

此时你肯定会好奇,这些归约的结果是如何保存的。毕竟,如果作业持续运行、永远不退出,我们肯定不希望将数据一直保留在内存中。但问题是,键值的集合正是计算的输出,相当于 Hadoop 作业中的输出结果集合。

持续 MapReduce 的 Flow 实现实际上支持两种不同的方式来存储并获取这些结果。如果结果只是中间结果,而且刚刚被送到另一个持续 MapReduce 计算中,那么我们会将其保存到基于 RocksDB 的分区且有副本的存储中。这些数据与 MapReduce 进程共享。

但你可以将结果“材料化”到支持键值语义的外部系统中。通常,这意味着归约结果可以保存到数据库中(如PostgreSQL),或数据仓库中(如Snowflake),并在新的源数据出现时持续更新。更新时也会查询之前的结果,以便进行归约。

再次说回温度传感器的例子,这次将其材料化到 Postgres 表中。在执行映射函数后,我们会查询 Postgres 表,查找该键对应的前一个结果。如果该结果存在,则将其作为归约函数的左输入。不论如何,输出结果都会通过 update 或 inser 语句保存到 Postgres 数据库中,这个步骤会针对源数据中的每个记录持续进行。Postgres 表永远保存最新的结果,你也不用等待作业运行结束。

我们在努力构建 Flow,它可以帮你实现基于持续 MapReduce 的实时数据流水线,这样你就不需要自行管理映射、归约和键函数了。Flow 的界面还在构思中,但 Flow 变换运行时的基础依然是持续 MapReduce。

参考链接:

https://www.estuary.dev/blog/why-mapreduce-is-making-a-comeback


上一篇:Spark(一)Spark介绍


下一篇:第十四章 MapReduce概述