摘要:
本文提出了分布式内存抽象的概念——弹性分布式数据集(RDD,Resilient Distributed Datasets)。它同意开发者在大型集群上运行基于内存的计算。RDD适用于两种应用,而现有的数据流系统对这两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域非经常见。二是交互式数据挖掘工具。这两种情况下。将数据保存在内存中可以极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD在共享状态的时候是基于粗粒度的转换而不是细粒度的更新(换句话说就是,RDD仅仅能通过其它RDD上的批量操作来创建)。
虽然如此,RDD仍然足以用于非常多类型的计算。包含专用的迭代编程模型(如Pregel)等。我们已经将RDD应用到Spark系统之中,并对改性进行了相关的评估。
介绍:
像MapReduce和Dryad这种集群计算框架已经被广泛地应用到了大规模的数据分析中。这些系统提供了高级的运算符(算子),使得用户可以轻松地写出并行计算的程序而不用考虑任务分配问题以及错误容忍的细节。
虽然眼下的框架都提供使用集群计算资源的高级抽象,可是使用分布式内存却没有相应的类似的高级抽象。
假设不能非常好地使用内存。那么,当一些应用的数据结果须要多次复用的情况出现的时候,运行效率就会受到影响。这些应用一般包含:
(1)迭代式机器学习和图应用中经常使用的迭代算法(每一步对数据运行相似的函数),比方pagerank、K-means聚合、逻辑回归等算法。
(2)交互式数据挖掘工具(用户重复查询一个数据子集)。
当前的框架採取的方法是将须要复用的数据存储到存储系统中(HDFS等),然后在每次查询时又一次载入,那么,在数据复制、磁盘I/O、序列化的过程中会产生较大的开销,而这些开销会支配整个任务运行的时间。
针对这个问题。我们提出了RDD,它能够为大部分的应用提供高效的数据复用。RDD是一个具有容错机制,并行数据架构,能够让用户将中间结果保存在内存中。并通过控制RDD的数据分区进而优化数据部署,还提供丰富的算子来操纵RDDs。
就错误容忍而言,如今的集群系统採取的方式通常是提供细粒度的更新操作:数据备份或者是通过log处理进行数据恢复。对于数据密集型的任务来说。由于要进行大量的数据拷贝,network的带宽压力非常大(network的带宽小于RAM(内存cache)的带宽),且文件系统负载也会加大。
相比而言,RDD提供了一种粗粒度的转换方式来解决容物容忍的问题。RDD通过Lineage(包括了怎样从其它RDD衍生所必需的相关信息),从而不须要检查点操作就能够重构丢失的数据分区。
我们通过微基准和用户应用程序来评估RDD。实验表明。在处理迭代式应用上Spark比Hadoop快高达20多倍,计算数据分析类报表的性能提高了40多倍,同一时候可以在5-7秒的延时内交互式扫描1TB数据集。
此外。我们还在Spark之上实现了Pregel和HaLoop编程模型(包含其位置优化策略),以库的形式实现(分别使用了100和200行Scala代码)。
RDDs
RDD是仅仅读的、分区记录的集合。RDD仅仅能通过稳定物理存储中的数据集和其它已有的RDD上的确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join等。
RDD始终不须要物化。RDD含有怎样从其它RDD衍生(即计算)出本RDD的相关信息(即Lineage信息)。据此能够从物理存储的数据计算出对应的RDD分区。
最后要说明的是,用户能够控制的RDDs的其它两个方面,持久化和划分:
(1)用户能够指定须要复用的RDDs并选择一个基于内存的存储策略。
用户能够请求将RDD缓存,这样执行时将已经计算好的RDD分区存储起来,以加速后期的重用。相关策略:缓存的RDD默认存储在内存中,但假设内存不够,能够写到磁盘上;用户也能够使用其它的存储策略。比方只将RDD存储在磁盘等等;用户还能够在每一个RDD上面指定优先级来确定最先spill到磁盘的RDD。
(2)用户能够就RDD每条记录里面的key进行划分。
这有助于部署的优化。比如,我们能够将两个数据集用相同的方式进行 hash-partitioned以便于之后的join操作(事先经过key进行了划分)。
- Spark Programming Interface
在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)的调用在其上进行转换操作。
程序猿一開始通过在存储层上的数据
“转换”(map、filter等)定义一个或者多个RDDs。即transformations算子操作过程。之后。程序猿就能够在
“动作”(actions)中使用RDD了。
动作是向应用程序返回值。或向存储系统导出数据的那些操作,比如,count(返回RDD中的元素个数)。collect(返回元素本身),save(将RDD输出到存储系统)。
在Spark中,仅仅有在RDD上第一次运行actions 操作时,才会真正開始计算RDD(即延迟计算)。
【补充】每一个运算(如flatMap、map)事实上返回的都是一个RDD对象。每一个RDD对象都有一个Parent,通过这个Parent,实际上我们把一个个RDD对象串联了起来!能够觉得最后形成了一个RDD对象的队列。直到最后须要计算时(调用了action算子,后调用runjob函数)才開始逐一调用各个RDD对象的compute方法。完毕实际的运算。
Example: Console Log Mining(控制台的日志挖掘)
本部分我们通过一个详细演示样例来阐述RDD。
假定有一个大型站点出错,操作员想要检查Hadoop文件系统(HDFS)中的日志文件(TB级大小)来找出原因。通过使用Spark。操作员仅仅需将日志中的错误信息装载到一组节点的内存中,然后运行交互式查询就可以。
首先,须要在Spark解释器中输入例如以下Scala命令:
<strong>lines </strong>= spark.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR"))
errors.cache()
第1行从HDFS文件定义了一个RDD(即一个文本行集合),第2行获得一个过滤后的RDD,第3行请求将errors缓存起来。注意在Scala语法中filter的參数是一个闭包。
errors.count()
用户还能够在RDD上运行很多其它的转换操作。并使用转换结果,如
<span style="font-size:18px;"> // Count errors mentioning MySQL
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()</span>
使用errors的第一个action执行以后。Spark会把errors的分区缓存在内存中(action之后才会将数据载入到内存)。极大地加快了兴许计算速度。注意,最初的RDD
lines由于错误信息可能仅仅占原数据集的非常小一部分(小到足以放入内存)。
最后。为了说明模型的容错性,下图给出了上述查询过程的的Lineage图。在RDD lines上运行filter操作,得到errors。然后再filter、map后得到新的RDD。在这个RDD上运行collect操作。
Spark调度器以流水线的方式运行后两个转换,向拥有errors分区缓存的节点发送一组任务。此外。假设某个errors分区丢失,Spark仅仅在对应的lines分区上运行filter操作(“血统”信息记录操作过程)来重建该errors分区。
- Advantages of the RDD Model(分布式共享内存)
为了进一步理解RDD作为一种分布式的内存抽象的优势,第一个优势见下表。表1列出了RDD与分布式共享内存(DSM。Distributed Shared Memory)的对照。
在DSM系统中。应用能够向全局地址空间的任何位置进行读写操作。(注意这里的DSM,不仅指传统的共享内存系统,还包含那些通过分布式哈希表或分布式文件系统进行细粒度的数据共享的系统,比方Piccolo)DSM是一种通用的抽象,但这样的通用性同一时候也使得在集群上实现有效的容错性更加困难。
RDD与DSM主要差别在于,RDD仅仅能通过粗粒度的批量转换来创建(即“写”)。然而,DSM能够对随意内存位置读写。也就是说,RDD限制应用仅仅能运行批量写操作,这样有利于实现有效的容错。特别地,RDD没有设置检查点的开销。由于能够使用Lineage来恢复RDD。
并且。失效时仅仅须要又一次计算丢失的那些RDD分区。能够在不同节点上并行运行,而不须要回滚整个程序。
【注意】通过内存中的任务备份,RDD还能够处理落后任务(即执行非常慢的节点),这点与MapReduce类似。而DSM则难以实现备份任务。由于任务及其副本都须要读写同一个内存位置。
与DSM相比,RDD模型有两个优点。
第一,对于RDD中的批量操作,执行时将依据数据存放的位置来调度任务,从而提高性能。第二。对于基于扫描的操作。假设内存不足以缓存整个RDD。就进行部分缓存。
把内存放不下的分区存储到磁盘上。此时性能与现有的数据并行系统差点儿相同。
最后看一下读操作的粒度。
RDD上的非常多动作(如count和collect)都是批量读操作,即扫描整个数据集,能够将任务分配到距离数据近期的节点上。同一时候。RDD也支持细粒度操作,即在哈希或范围分区的RDD上运行keyword查找。
- Applications Not Suitable for RDDs
RDD能够非常好的支持
数据并行的批量迭代分析应用,且拥有非常好的容错机制。
RDD不太适合那些异步更新共享状态的应用。比如并行web爬行器。因此,我们的目标是为大多数批量分析型应用提供有效的编程模型。此外类型的应用交给专门的系统来处理。
Spark Programming Interface
Spark用Scala语言实现了RDD的API。Scala是一种基于JVM的静态类型、函数式、面向对象的语言。
我们选择Scala是由于它简洁(特别适合交互式使用)、有效(由于是静态类型)。
可是,RDD抽象并不局限于函数式语言。
为了使用Spark,开发人员须要编写一个driver程序,连接到集群以执行Worker,例如以下图所看到的。Driver定义了一个或多个RDD,并调用RDD上的动作。
Worker是长时间执行的进程。将RDD分区以Java对象的形式缓存在内存中。
再看看上面对log文件信息进行操作的样例,用户运行RDD操作时会提供參数,比方map会传递一个闭包(closure。函数式编程中的概念)。
Scala将闭包表示为Java对象。这些对象在传递的时候被序列化,通过网络传输到其它节点上进行装载。Scala将闭包内的变量保存为Java对象的字段。比如。var
x = 5; rdd.map(_ + x) 这段代码将RDD中的每一个元素加5。
RDD本身是静态类型对象。由參数指定其元素类型。比如,RDD[int]是一个整型RDD。
只是。我们举的样例差点儿都省略了这个类型參数。由于Scala支持类型判断。
尽管在概念上使用Scala实现RDD非常easy,但还是要处理一些Scala闭包对象的反射问题,我们也须要解决如何通过Scala解释器来使用Spark还须要很多其它工作。
无论如何,我们都不须要改动Scala编译器。
下表列出了Spark中基本的RDD transformations和actions操作。每一个操作都给出了标识,当中方括号表示类型參数。例如以下图:
【注意】有些操作仅仅对键值对可用,比方join。另外。函数名与Scala及其它函数式语言中的API匹配。比如map是一对一的映射,而flatMap是将每一个输入映射为一个或多个输出(与MapReduce中的map类似)。
除了上述这些操作以外。用户还能够请求将RDD缓存起来。并且。用户还能够通过Partitioner类获取RDD的分区顺序,然后将还有一个RDD依照相同的方式分区。还有些操作会自己主动产生一个哈希或范围分区的RDD。像groupByKey,reduceByKey和sort等。
如今我们讨论一些迭代应用: logistic regression(逻辑归约)和PageRank.。
Logistic Regression(逻辑规约)
非常多机器学习算法都具有迭代特性,执行迭代优化方法来优化某个目标函数,比如梯度下降方法。
假设这些算法的工作集可以放入内存,将极大地加速程序执行。
比如以下的程序是逻辑回归(一种常见的分类算法)的实现,该程序寻找一个最佳切割两组点(即垃圾邮件和非垃圾邮件)的超平面w。
算法採用梯度下降的方法:開始时w为随机值,在每一次迭代的过程中,对w的函数求和。然后朝着优化的方向移动w。
<span style="font-size:18px;">val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}</span>
上述程序首先定义一个名为points的缓存RDD。这是在文本文件上运行map转换之后得到的,即将每一个文本行解析为一个Point对象。然后在points上重复运行map和reduce操作。每次迭代时通过对当前w的函数进行求和来计算梯度。
之后的小节我们将看到这样的在内存中缓存points的方式,比每次迭代都从磁盘文件装载数据并进行解析要快20X。
已经在Spark中实现的迭代式机器学习算法还有:kmeans(像逻辑回归一样每次迭代时运行一对map和reduce操作)。期望最大化算法(EM,两个不同的map/reduce步骤交替运行)。交替最小二乘矩阵分解和协同过滤算法。
该算法通过合计链接到自身页面的page对其的贡献,进而迭代式地更新自身的排名。每一次的迭代中。每个文件(page)都将r/m的贡献发送给他的邻居。当中r是它的rank,n是它邻居页面的的数目,之后。它会将自身的排名更新为α/N + (1 − α)∑ci,当中∑ci是是当前页面获取到的贡献的总和,N是文件(page)的总数。在Spark中。我们能够这样实现PageRank:
<span style="font-size:18px;">// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}</span>
该程序相应的“血统”图例如以下:
在上图中,每一次的迭代。我们都会在之前的迭代过程以及静态的links数据集的基础上创建新的ranks数据集。这个图的一个有趣的功能是它的规模会随着迭代的次数增长。因此。在一个有非常多次迭代的job中,对ranks进行复制备份进而降低错误恢复的次数也是非常必要的。然而。须要注意的是links数据集不须要进行复制备份。由于links的恢复能够通过“血统”来高效地运行(对文件进行map操作)。links的数据集会比ranks大非常多,由于非常多文件都有非常多link可是。一个文件仅有一个rank。
最后一点,我们能够通过控制RDD的划分来优化PageRank处理过程中的交互。假设我们指明了links的划分(hash-partition)。我们能够用同样的方法划分ranks的数据集而且能够确定links和ranks之间的join操作不须要交互(由于每个URL的link和它的link列表都在同一个机器(由于hash-partition))。我们也能够写一个自己定义的Partitioner类来聚集相互链接的pages(比方,通过域名划分)。上述的每种优化过程都能够在定义links的时候通过调用partitionBy来进行:
<span style="font-size:18px;"> links = spark.textFile(...).map(...)
.partitionBy(myPartFunc).persist()</span>
通过上述的调用,links和ranks之间的join操作将会自己主动地获取到来自其它page对当前页面的贡献并又一次计算当前页面的rank值。
Representing RDDs
提供RDD作为抽象的一个挑战是针对通过大范围的转换进行跟踪的“血统”的表示方式的选择。
理想的话,一个运行RDDs相关操作的系统应该尽可能多的提供转换算子,而且同意用户随意组合这些算子。关于上述的挑战,一个简单的基于图的RDDs表示法有助于目标的完毕。
在Spark中已经開始使用。
概括来说,每一个RDD须要提供包括四种类型信息的接口:
(1)一组RDD分区(partition。即数据集的原子组成部分);
(2)对父RDD的一组依赖,这些依赖描写叙述了RDD的Lineage;
(3)一个函数,即在父RDD上运行何种计算;
(4)元数据。描写叙述分区模式和数据存放的位置。比如。一个表示HDFS文件的RDD包括:各个数据块的一个分区。并知道各个数据块放在哪些节点上。并且,这个RDD上的map操作结果也具有相同的分区,map函数是在父数据上运行的。下表总结了RDD的内部接口。
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
设计接口的一个关键问题就是,怎样表示RDD之间的依赖。我们发现RDD之间的依赖关系能够分为两类,即:(1)窄依赖(narrow dependencies):子RDD的每一个分区至多依赖一个父分区(即与数据规模无关)。(2)宽依赖(wide
dependencies):子RDD的多个分区能够依赖一个父RDD分区。比如。map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)。还有一个样例见下图:
区分这两种依赖非常实用。
首先,窄依赖同意在一个集群节点上以流水线的方式(pipeline)计算全部父分区。比如。逐个元素地运行map、然后filter操作;而宽依赖则须要获取全部父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。第二,窄依赖可以更有效地进行失效节点的恢复,即仅仅需又一次计算丢失RDD分区的父分区,并且不同节点之间可以并行计算。而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的祖先产生冗余计算开销。
【补充】
第一,窄依赖能够在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD相应的某块数据;宽依赖则要等到父RDD全部数据都计算完毕,而且父RDD的计算结果进行hash并传到相应节点上之后才干进行子RDD的计算。
第二。数据丢失时,对于窄依赖仅仅须要又一次计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的全部数据块全部又一次计算来恢复(在宽依赖情况下,丢失一个子RDD分区须要又一次计算的每一个父RDD的每一个分区的全部数据并非都给丢失的子RDD分区用的(groupbykey()),会有一部分数据相当于相应的是未丢失的子RDD分区中须要的数据。这样就会产生冗余计算开销,这也是宽依赖开销更大的原因)。所以在长“血统”链特别是有宽依赖的时候,须要在适当的时机设置数据检查点(Tachyon中已经通过Edge算法实现,见Tachyon论文翻译)。
也是这两个特性要求对于不同依赖关系要採取不同的任务调度机制和容错恢复机制。
通过RDD接口,Spark仅仅须要不超过20行代码便能够实现大多数转换。
以下给出一些RDD的实现实例:
HDFS文件:眼下为止我们给的样例中输入RDD都是HDFS文件,对这些RDD能够运行:partitions操作返回各个数据块的一个分区(每一个Partition对象中保存数据块的偏移)。preferredLocations操作返回数据块所在的节点列表。iterator操作对数据块进行读取。
map:不论什么RDD上都能够运行map操作,返回一个MappedRDD对象,该对象与父RDD具有同样的分区以及偏好位置。该操作传递一个函数參数给map,对其父RDD上的记录依照iterator的方式运行这个函数。
union:在两个RDD上运行union操作,返回两个父RDD分区的并集。
通过对应父RDD上的窄依赖关系计算每一个子RDD分区(注意union操作不会过滤反复值,相当于SQL中的UNION ALL)。
sample:抽样与映射类似。可是sample操作中,RDD须要存储一个随机数产生器的种子(随机数的起始值),这样每一个分区可以确定哪些父RDD记录被抽样。
join:对两个RDD运行join操作可能产生窄依赖(假设这两个RDD拥有同样的哈希分区或范围分区)。可能是宽依赖,也可能两种依赖都有(比方一个父RDD有分区,而还有一父RDD没有)。
【补充】join过程见下图(基本过程是:先对须要连接的RDD进行cogroup函数操作。
cogroup之后形成的新RDD。对每一个key下的元素进行笛卡尔积操作)。
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
Implementation
我们已经用scala代码实现了Spark系统。
系统执行在Mesos集群管理之上,且能够与Hadoop、MPI以及其它应用进行资源共享(兼容)。每个Spark程序作为一个独立的Mesos应用执行,且都有自己的driver(master)和workers。应用之间的资源共享由Mesos来处理。
Spark能够通过Hadoop已经存在的插件接口读取来自不论什么Hadoop的输入资源。
总的来说。我们的调度器跟Dryad类似,但我们还考虑了哪些RDD分区是缓存在内存中的。调度器依据目标RDD的Lineage图创建一个由stage构成的有向无环图(DAG)例如以下图所看到的。每一个stage内部尽可能多地包括一组具有窄依赖关系的流水线转换(transformations)。stage边界的划分有两种情况:一是宽依赖上的Shuffle操作;二是已计算的分区。它能够缩短父RDD的计算过程。在stage内须要启动一组任务用于计算缺失的分区,直到目标RDD计算完毕。
图解:Spark如何划分任务阶段(stage)的样例。实线方框表示RDD,实心矩形表示分区(黑色表示该分区被缓存)。要在RDD G上运行一个动作(action)。调度器依据宽依赖创建一组stage。并在每一个stage内部将具有窄依赖的转换流水线化(pipeline)。
本例不用再执行stage 1,由于B已经存在于缓存中了,所以仅仅须要执行2和3。
因为延迟调度的机制。调度器依据数据存放的位置将任务分配给机器(数据的本地性)。假设某个任务须要处理一个已经缓存好的分区,则直接将任务分配给拥有这个分区的节点。否则,假设须要处理的分区位于多个可能的位置(比如。由HDFS的数据存放位置决定),则将任务分配给这一组节点。
对于宽依赖(比如须要Shuffle的依赖),眼下的实现方式是,在拥有父分区的节点上将计算的中间结果物化(持久化),通过复制备份简化容错处理,这跟MapReduce中物化map输出非常像。
假设某个任务失效,仅仅要stage中的父RDD分区可用。则仅仅需在还有一个节点上又一次执行这个任务就可以。
假设某些stage不可用(比如。Shuffle时某个map输出丢失),则须要又一次提交这个stage中的全部任务来并行计算丢失的分区。
最后,尽管Spark中全部的计算都是由于action算子的触发才開始的。
- Interpreter Integration(解释器的集成)
像Ruby和Python一样,Scala也有一个交互式shell。
基于内存的数据能够实现低延迟,我们希望同意用户从解释器交互式地执行Spark,从而在大数据集上实现大规模并行数据挖掘。
Scala解释器通常依据用户输入的代码行。来对类进行编译,接着装载到JVM中,然后调用类中的一个函数进行处理。
这个类是一个包括输入行上的变量或函数的单例对象,且会用一个初始化函数执行这行代码。
比如,假设用户输入代码var x =
5,接着又输入println(x),则解释器会定义一个包括x的Line1类。并将第2行编译为println(Line1.getInstance().x)。
在Spark中我们对解释器做了两点修改:
(1)类传输:解释器可以支持基于HTTP传输类字节码。这样worker节点就能获取输入每行代码相应的类的字节码。
(2)
改进的代码生成逻辑:通常在每行代码行上创建的单态对象通过相应类上的静态方法被訪问。也就是说,假设要序列化一个闭包,它会引用前面代码行中变量。比方上面的样例Line1.x,Java不会依据对象关系传输包括x的Line1实例。
所以worker节点不会收到x。所以,我们将这样的代码生成逻辑改为直接引用各个行对象的实例。
下图说明了解释器怎样将用户输入的一组代码行解释为Java对象。
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
Spark解释器便于跟踪处理大量对象关系引用,而且便利了HDFS数据集的研究。
我们计划以Spark解释器交互式地执行高级数据分析语言。比方类似SQL。
(1)内存存储反序列化的java对象;
(2)内存存储序列化的数据。
(3)磁盘存储方式。
第一种选择能够提供最快的性能,由于java虚拟机能够本地化获取RDD。另外一种选择能够让用户选择一个相对于空间受限的java对象图(UML图?)而言更加具有内存效率的方法(无需组织java对象,仅仅是使用纯粹的数据对象),可是要以减少性能为代价。第三种方法对于过大而不能保存在RAM中的RDD是实用的。可是重计算会有消耗。
为了管理有限的内存。我们在RDDs的级别使用LRU回收策略。当一个新的RDD分区被计算可是又没有足够的空间来存储。我们回收一个近期最少訪问的RDD的分区,除非这个RDD与包括那个将要被计算的新分区的RDD是一个RDD。在这样的情况下。我们将旧的分区保存在内存中进而阻止同样RDD中的partiton不停地换进换出。这是非常重要的由于大多数的操作会在整个RDD上执行tasks,因此内存中的分区在将来被使用是非常可能的。
我们发现这个默认的策略至今为止在我们的应用中都工作的非常好。我们也为用户提供了更深一步的控制策略:为每个RDD提供持久化存储。
最后要说的是。如今集群上的每个Spark实例都有自己独立的内存空间。将来,我们计划探究通过标准的内存管理器管理Spark实例来共享RDDs。
- Support for Checkpointing
虽然RDD中的Lineage信息能够用来故障恢复,但对于那些Lineage链较长的RDD来说,这样的恢复可能非常耗时。比如上述的Pregel任务。每次迭代的顶点状态和消息都跟前一次迭代有关,所以Lineage链非常长。所以,在RDD上运行检查点设置就非常有效。
一般来说,Lineage链较长、宽依赖的RDD须要採用检查点机制。这样的情况下,集群中一个节点故障(导致f父RDD所在的磁盘数据丢失)可能导致每一个父RDD的数据块丢失。因此须要所有又一次计算。相反,假设在窄依赖之上使用检查点的操作是没有价值的。假设一个节点失败了,能够通过“血统”进行又一次计算。这样带来的开销也不过复制RDD的几分之中的一个而已。
Spark当前已经提供了checkpoint的接口,可是那些数据须要进行checkpoint操作的选择权留给了用户。然而我们也研究了怎样运行自己主动的检查点设置,由于我们的调度器知道每个数据集的大小以及它第一次计算所占用的时间,所以选择一个更优的RDD数据集设置检查点是能够最小化系统数据恢复的计算时间。
值得注意的是。由于RDD是仅仅读的,所以不须要不论什么一致性维护(比如写复制策略,分布式快照或者程序暂停等)带来的开销,也不会影响到后台运行检查点设置操作。
Evaluation
我们在Amazon EC2[1]上进行了一系列实验来评估Spark及RDD的性能,并与Hadoop及其它应用程序的基准进行了对照。
总的说来。结果例如以下:
(1)对于迭代式机器学习以及图相关的应用。Spark比Hadoop快20多倍。
这样的加速比是由于:Spark中数据以Java对象的形式存储在内存中。进而避免了过多的I/O操作以及反序列化操作。
(2)用户编写的应用程序运行结果非常好。特别地,Spark分析报表比Hadoop快40多倍。
(3)假设节点发生失效。通过重建那些丢失的RDD分区,Spark可以实现高速恢复。
(4)Spark可以在5-7s延时范围内,交互式地查询1TB大小的数据集。
我们的基准測试首先从一个执行在Hadoop上的具有迭代特征的机器学习应用和PageRank開始。然后评估在Spark中当工作集不能适应缓存时系统容错恢复能力,最后讨论用户应用程序和交互式数据挖掘的结果。
除非特殊说明,我们的实验使用m1.xlarge EC2 节点,4核15GB内存。使用HDFS作为持久存储。块大小为256M。在每一个作业执行执行时,为了保证磁盘IO开销开销的測试更加精确,我们清理了集群中每一个节点的操作系统缓存。
- Iterative Machine Learning Applications
我们实现了2个迭代式机器学习(ML)应用。Logistic回归和K-means算法,在例如以下系统之间进行性能对照:
(1)Hadoop:Hadoop 0.20.0稳定版。
(2)HadoopBinMem:在首轮迭代中通过将输入数据转换成为开销较低的二进制格式来降低兴许迭代过程中文本解析的开销。并将其保存在基于内存的HDFS实例中。
(3)Spark:基于RDD的系统(在首轮迭代中缓存Java对象以降低兴许迭代过程中解析、反序列化的开销)
我们使用同一数据集在同样条件下执行Logistic回归和K-means算法:在25-100台机器。执行10次迭代处理100G输入数据集。两个作业的关键差别在于每轮迭代的单个字节上的计算量不同。
K-means的迭代时间取决于更新聚类坐标(计算过程)耗时,Logistic回归是非计算密集型的。可是在I/O操作和解析过程中很耗时。
因为典型的机器学习算法须要数10轮迭代才干够收敛,我们分别统计了首轮迭代和兴许迭代计算的耗时,并从中发现,通过RDD进行数据共享极大地加快了兴许迭代的速度。
首轮迭代。在首轮迭代过程中,三个系统都是从HDFS中读取文本数据作为输入。如图7所看到的。实验中Spark略快于Hadoop,主要是由于Hadoop中的master节点和worker节点之间基于心跳协议来发送信号带来了开销。HadoopBinMem是最慢的,由于它通过一个额外的MapReduce作业将数据转换成二进制格式会带来开销,而且HadoopBinMem必须通过网络将二进制文件在内存中进行备份操作,这也会带来开销。
兴许迭代。
图7显示了兴许迭代的平均耗时,图8对照了不同大小集群条件下耗时情况。从图8(a)我们发现,在100个节点上执行Logistic回归程序。Spark比Hadoop、HadoopBinMem分别快25.3、20.7倍。
从图8(b)能够看到,虽然是更加典型的计算密集型的K-means应用,Spark仍旧比Hadoop、HadoopBinMem分别快1.9、3.2倍,这是由于K-means程序的计算开销是基本的(用很多其它的节点有助于提高计算速度的倍数)。
【对于图7而言,须要注意】兴许迭代中,Hadoop仍然须要从HDFS读取文本数据作为输入,所以从兴许迭代中Hadoop的迭代时间并没有明显的改善。
可是,使用预先转换的SequenceFile文件(Hadoop内建的二进制文件格式)的 HadoopBinMem,在兴许迭代中节省了解析的代价。可是仍然会带来其它的开销,如从HDFS读SequenceFile文件并转换成Java对象等。由于Spark是直接读取缓存于RDD中的Java对象的。随着集群尺寸的线性增长,迭代时间大幅下降。性能得到提升。
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
理解速度提升。我们很惊奇地发现,Spark胜过基于内存存储二进制数据的Hadoop(HadoopBinMem)高达20倍,Hadoop执行慢是因为例如以下几个原因:
(1)Hadoop软件栈限定的最小开销;
(2)HDFS提供数据的开销。
(3)将二进制记录转换成内存Java对象过程中的解序列化的开销。
我们依次研究了上述的每个因素:
为了估測1,我们执行空的Hadoop作业,只执行作业的初始化、启动任务、清理工作就至少耗时25秒。
对于2。我们发现为了服务每个HDFS数据块,HDFS进行了多次内存复制以及计算校验和操作。必定会带来对应的开销。
为了估測3,我们在单个节点上执行了微基准程序,在输入的不同形式的256M数据上计算Logistic回归。特别地,对于来自基于内存的HDFS和本地内存两种情况下的文件。我们对照直接处理输入的文本和处理转换为二进制后的文件的性能。
首先。基于内存的HDFS文件和基于内存的本地文件的读取过程,前者多耗时2秒。其次。直接的文本和二进制格式文件的解析过程,前者多耗时7秒(Spark RDD除外)。最后,即使是处理基于本地内存的文件,预解析的二进制文件转换为内存中的Java对象,也要耗时3秒。然而,通过直接将RDD缓存为内存中的Java对象,就会避免上述的开销,终于仅仅须要耗时3秒。
通过使用存储在HDFS上的54G大小的Wikipedia导出的数据,我们比較了Spark与Hadoop在PageRank计算方面的性能。PageRank算法通过10轮迭代处理了大约400万文章的链接图数据。图10显示了在30个节点上。基础的Spark处理速度是Hadoop的2.4倍。改进(对RDD进行Hash分区,来让迭代过程变得连贯)后速度提升到7.4倍,这些结果数据是随着节点扩展到60个而同步放大的。
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
我们也通过在Spark上实现Pregel来执行PageRank算法进行评估,在后面会提到。迭代次数和图10类似,可是耗时要多4秒。由于Pregel会在每一次的迭代过程中执行额外的操作来让节点“投票”决定是否结束当前job。
【补充】Pregel:Google分布式图计算框架,主要用于图遍历、最短路径、PageRank算法等。
基于K-means算法应用程序,我们评估了在单点故障(SPOF)时使用Lneage信息创建RDD分区的开销。图11显示了。K-means应用程序执行在75个节点的集群中进行了10轮迭代,我们在正常操作和进行第6轮迭代開始时一个节点发生问题的情况下对耗时进行了对照。
没有不论什么失败的情况下。每轮迭代启动了400个任务处理100G数据。
第5轮迭代结束时。交互时间大约为58秒。第6轮迭代时,一个节点被Kill掉,该节点上的任务都被终止,存储在其上的RDD的分区也丢失了。
Spark调度器调度这些任务在其它节点上又一次并行执行:又一次读取一直的输入数据并基于Lineage信息重建RDD。这使得迭代计算耗时添加到80秒。一旦丢失的RDD分区被重建,平均迭代时间又回落到58秒。
须要注意的是,基于检查点的错误恢复机制(纯粹的检查点操作。并没有“血统”机制),恢复过程须要运行非常多次的迭代操作。
并且,系统须要通过网络将应用中多达100GB的数据进行复制。由于复制,Spark也许须要花费两倍的内存来完毕备份工作。
相反。我们样例中的RDD的“血统”图的存储只须要10KB大小的空间(存储在磁盘)。
- Behavior with Insufficient Memory(内存不足)
到如今为止,我们能保证集群中的每一个节点都有足够的内存去缓存迭代过程中使用的RDD。那么问题来了,假设没有足够的内存来缓存一个作业的工作集,Spark又是怎样执行的呢?在实验中。我们通过在每一个节点上限制缓存RDD所须要的内存资源来对Spark进行配置,在不同的缓存配置条件下执行Logistic回归。结果如图12。我们能够看出,随着内存的减小,性能平缓地下降。
- User Applications Built with Spark
In-Memory Analytics
视频分发公司Conviva使用Spark极大地加快了一些数据分析报告的处理速度,之前是在Hadoop上执行的。比如,一个报告的处理相当于一系列的Hive[1]的查询处理过程。过程中对关于用户各种各样的统计数据进行了计算,这些查询作用在同样的数据子集上(数据满足用户提供的条件(过滤器)),可是在不同分组的字段上执行聚合操作(聚合函数)(SUM、AVG、COUNT
DISTINCT等)须要使用单独的MapReduce作业。
通过运行Spark中的查询操作将共享的数据子集载入到一个RDD里面。该公司能够将报告的处理速度提升40倍。
在Hadoop集群上处理200G压缩数据生成的报告耗时20小时。而使用Spark基于96G内存的2个节点只需耗时30分钟就可以完毕,主要是由于Spark只将符合用户要求的信息的行列数据存储到了一个RDD里面,而不须要存储整个解压缩的文件。
Traffic Modeling(城市交通建模)
在Berkeley的Mobile Millennium项目中。研究人员基于一系列分散的汽车GPS监測数据,使用并行化学习算法来推算公路交通拥堵状况。数据来自10000个互联的公路线路网,还有600000个由汽车GPS装置採集到的点到点的旅行时间的样本数据,这些数据记录了汽车在两个地点之间行驶的时间(每一条路线的行驶时间可能跨多个公路线路网)。
使用一个交通模型。可以预计跨多个公路网行驶的耗时,由此,系统可以估算交通的拥堵状况。
研究人员使用Spark实现了一个可迭代的EM(最大期望)算法,算法迭代地反复运行map、reduceByKey操作。
应用从20个节点扩展到80个节点(每一个节点4核),如图13(a)所看到的。
Twitter Spam(垃圾邮件)分类。Berkeley的Monarch项目使用Spark识别Twitter消息上的Spam链接。他们在Spark上实现了一个Logistic回归分类器。并使用分布式的reduceByKey操作对梯度向量并行求和。
图13(b)显示了基于50G数据子集的分类器的训练结果,整个数据集包含250000的URL、至少10^7个与网络相关的特征/维度以及每一个URL相应页面的内容属性。随着节点的添加,这并不像交通应用程序那样近似线性,主要是由于每轮迭代的固定通信代价较高。
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
- Interactive Data Mining(交互式数据挖掘)
为了展示Spark交互式查询大数据集的能力。我们在100个m2.4xlarge EC2实例(8核68G内存)上使用Spark分析两年间的1TB的Wikipedia页面浏览日志数据。在整个输入数据集上简单地查询例如以下内容以获取页面浏览总数:
(1)所有页面;
(2)页面的标题能精确匹配给定的关键词。
(3)页面的标题能部分匹配给定的关键词。
图14 显示了分别在整个、1/2、1/10的数据集上查询的响应时间,甚至1TB数据在Spark上查询仅耗时5-7秒,这比直接操作磁盘数据快几个数量级,比如,从磁盘上查询1TB数据耗时170秒。这表明了RDD缓存使得Spark成为一个交互式数据挖掘的强大工具。
Discussion
- Expressing Existing Programming Models
RDD能够高效地实现眼下已经被提议独立的集群编程模型。所谓高效。就是RDDs不只能够实现跟这些模型一样的输出。还能够优化这些框架的性能。比方将特定的数据保存在内存。将RDD进行分区进而实现节点交互的优化。以及进行高效的数据恢复。这些能够使用RDDs进行实现的模型包含:
MapReduce
该模型能够使用Spark里面的flatMap和groupByKey算子来实现。或者是在有combiner的前提下用reduceByKey来实现。
DryadLINQ
该系统相对于MapReduce会提供更加广泛的算子,这些算子基本上都是Spark中的的总体计算转换算子,比方(map, groupByKey, join等
SQL
类似于DryadLINQ。SQL在记录集上运行数据并行查询的操作。
Pregel
Google的Pregel是专门用于图处理应用的模型,这个模型看起来跟其它面向数据集的程序模型非常不一样。在Pregel(基于Bulk Synchronous
Parallel。总体同步并行计算模型)中。程序由一系列超步(Superstep)协调迭代执行在每一个超步中,各个顶点执行用户函数,并更新对应的顶点状态。改变图的拓扑结构。然后向下一个超步的顶点集发送消息。
这样的模型可以描写叙述非常多图算法,包含最短路径,双边匹配和PageRank等。
使用RDDs来完毕上述模型的主要原因是Pregel对全部定点的每一次迭代中使用了相似的用户程序。因此,我们能够将一个RDD中每一次迭代的定点状态信息进行存储。然后用该用户函数运行总体转换(flatmap)并将相关信息存储在新生成的RDD里面。我们能够将这个新生成的RDD和定点的状态进行join操作来进行定点的信息交换更新操作。
另一点非常重要,RDDs能够同意我们像Pregel一样将定点状态信息存储在内存中,这样就能够通过控制RDD的分区来优化集群节点之间的交互了。也能够支持部分的错误回复。
Iterative MapReduce
近期的好几个系统,包含Hadoop和Twister都提供了交互式的MapReduce模型。在该模型中。用户能够给系统一系列的job来迭代运行。此系统能够通过迭代连续地进行数据划分,且Twister能够将数据保存在内存其中。
Batched Stream Processing
研究人员近期提议了好几个增量的处理系统,这些系统都能够周期性地更新数据。比如,一个每15分钟对广告点击统计进行更新的应用,该应用能够将之前15分钟的状态和最新的log数据进行合并从而得到最新的状态。
这些系统运行和Dryad类似的总体操作。可是会在分布式文件系统中存储状态。将中间状态放入RDDs中能够加速处理过程。
Explaining the Expressivity of RDDs
为什么RDDs能够实现各种程序模型?由于附加在RDD上面的限制差点儿不会影响到大多数的并行应用。
特别地,尽管RDDs只只能通过总体的转换操作来创建,可是非常多的并行程序自然地会对记录採用同样的操作(适合总体操作的模式)。
相似地,RDDs的不可变性也不是什么障碍。由于我们能够创建多个RDD来来代表同样数据集的不同版本号。
其实,眼下使用文件系统的非常多的MapReduce应用都不支持文件的更新操作。
- Leveraging RDDs for Debugging
当我们最開始设计RDDs进行广义地(“血统”)针对错误容忍的重计算的时候。这个属性也有助于debug。
特别地,通过记录一个job期间创建的“血统”信息,我们之后能够又一次计算这些RDDs,也能够让用户交互式地查询这些信息。进而能够在一个单进程的debugger过程中通过又一次计算相应的RDD分区来又一次执行来自一个job的不论什么的task(有针对性地获取“血统”信息。并有针对性地对单个partition进行计算来针对性地重执行单个task)。不像传统分布式系统那样通过又一次执行app来进行bug调试的调试者。必须在较多的nodes间搞清楚事件的执行顺序。相对而言,RDDs的debug过程没有添加不论什么负载,由于仅仅有RDD的“血统”图须要被记录,我们眼下也正基于上述的观点开发一个Spark调试器。
Related Work
- Cluster Programming Models
眼下的集群模型分为好几类:
(1)
数据流模型。
像MapReduce、Dryad、Ciel那样拥有一系列处理数据的算子,可是是使用稳定的存储系统来进行数据的共享的。
相对于那些稳定的存储系统,RDDs代表一种更加高效的数据共享抽象,由于RDDs避免了数据复制、I/O以及序列化的开销。
(2)
拥有高级程序接口的数据流系统。比方DryadLINQ、FlumeJava等,这些系统会提供相关的API使得用户能够通过map和join算子进行并行的数据获取。
然而,在这些系统中,并行的数据获取对象能够是磁盘中的文件或者是用于查询过程的临时的数据集。且系统会通过同样的算子来进行流水线式的数据处理(比方一系列的map操作),可是同样的算子的处理过程中,数据共享的效率太低。
为了更加便利,我们在并行获取数据的过程中以正常的Spark API为基础。可是在口后背后会提供RDDs作为存储抽象。
(3)
为特定的须要数据共享的应用提供高级接口的系统。比如,Pregel支持迭代的图应用。而Twister和HaLoop是处理迭代式的MapReduce的。
然而,这些系统会为他们支持的计算模块提供数据共享。不会为用户提供普遍的数据共享的抽象。比如。用户无法使用Pregel或者Twister来将一个数据集载入到内存。并决定哪个数据查询的操作能够在上面运行。
可是RDDs提供了一个分布式存储抽象而且能够支持哪些专门的系统无法支持的应用,比方交互式数据挖掘。
(4)
暴露了可分享的可变的状态的一些系统。比如。Piccolo让用户能够执行一些并行函数来读取或者更新一个分布式hash表的元素。
DSM(Distributed
shared memory )以及key-value存储(比方RAMCloud提供了相似的模型)也是类似的。
RDDs与上述系统有两个方面的不同。第一。RDD基于算子map、sort和join等提供了高级编程接口,然而,Piccolo和DSM只只能读取以及更新hash表中的元素。第二,Piccolo和DSM系统通过检查点以及回滚操作来运行数据恢复,其开销要比RDDs的基于“血统”的错误容忍策略大非常多。
Caching Systems
Nectar可以通过识别带有程序分析的子表达式,跨DryadLINQ作业重用中间结果。这样的能力将会增加到基于RDD的系统。
可是Nectar并没有提供In-Memory缓存(使用一般的分布式文件系统)。也不可以让用户显式地控制应该缓存那个数据集,以及怎样对其进行分区。Ciel和FlumeJava相同可以记住任务结果,但不能提供In-Memory缓存并显式控制它的缓存方式。
Ananthanarayanan系统通过利用数据存取的暂时性和空间本地性。进而添加了一个基于内存缓存的分布式文件系统。这样的方法提供了针对文件系统更快的数据存取速度,可是和共享中间结果的RDDs比起来效率还是不行,由于该系统还是须要应用将stage操作之间的结果写入文件系统进而共享数据。
Lineage
在科学计算和数据库领域。表示数据的Lineage和原始信息一直以来都是一个研究课题。对于一些应用。如须要解释结果以及同意被又一次生成、工作流中发现了bug或者数据集丢失须要又一次计算数据。RDD提供了一个并行的编程模型。在这个模型中使用的“血统”机制来表示是很easy的,因此它能够被用于容错。
基于“血统”的恢复机制事实上与MapReduce以及Dryad的恢复机制是类似的,也会追踪tasks构成的DAG图的依赖关系。
然而在这些系统中,“血统”信息会在一个job结束之后丢失,最后还是须要採用在存储系统进行复制备份的方法来共享计算的数据。相反,RDDs用"血统"机制高效地将数据缓存至内存而不须要复制和I/O的开销。
Relational Databases
从概念上看,RDD类似于数据库中的视图,RDD的持久化过程类似于视图的物化。然而,像DSM系统一样,数据库同意典型地读写全部记录,通过操作日志和数据的日志来实现容错。还须要花费额外的开销来维护一致性。RDD编程模型粗粒度的转换模型能够避免这些开销。
Conclusion
我们提出的RDD是一个高效的,通用的,具有错误容忍的应用于共享数据的集群应用的抽象。RDD广泛支持并行应用,还包含已经提出来的非常多专门的程序模型处理的迭代计算以及那些专门的程序模型不支持的新应用。
不像已经存在的通过数据复制进行容错处理的集群存储抽象。RDD基于粗粒度的转换操作提供了一些列的API,这些API可以基于“血统”机制进行高效率的数据恢复。
使用RDD的Spark系统实现处理迭代式作业的速度超过Hadoop大约20倍,并且还可以交互式查询数百GB的数据。