什么是Spark?可能你很多年前就使用过Spark,反正当年我四六级单词都是用的星火系列,没错,星火系列的洋名就是Spark。
当然这里说的Spark指的是Apache Spark,Apache Spark™is a fast and general engine for large-scale data processing: 一种快速通用可扩展的数据分析引擎。如果想要搞清楚Spark是什么,那么我们需要知道它解决了什么问题,还有是怎么解决这些问题的。
Spark解决了什么问题?
在这里不得不提大数据,大数据有两个根本性的问题,一个是数据很大,如何存储?另外一个是数据很大,如何分析?毕竟分析大数据是为了改善产品的用户体验,从而获取更多的价值。
对于第一个问题,开源社区给出的方案就是HDFS,一个非常优秀的分布式存储系统。
对于第二个问题,在Hadoop之 后,开源社区推出了许多值得关注的大数据分析平台。这些平台范围广阔,从简单的基于脚本的产品到与Hadoop 类似的生产环境。Bashreduce在 Bash环境中的多个机器上执行 MapReduce 类型的操作,可以直接引用强大的Linux命令。GraphLab 也是一种MapReduce 抽象实现,侧重于机器学习算法的并行实现。还有Twitter 的 Storm(通过收购 BackType 获得)。Storm 被定义为 “实时处理的 Hadoop”,它主要侧重于流处理和持续计算。
Spark就是解决第二个问题的佼佼者。Why Spark?
Why Spark?
现在有很多值得关注的大数据分析平台,那么为什么要选择Spark呢?
速度
与Hadoop的MapReduce相比,Spark基于内存的运算比MR要快100倍;而基于硬盘的运算也要快10倍!
(From the Project HomePage)
易用
Spark支持Java,Python和Scala。而且支持交互式的Python和Scala的shell,这意味这你可以非常方便的在这些shell中使用Spark集群来验证你的解决问题的方法,而不是像以前一样,打包。。。这对于原型开发非常重要!
Hadoop的WorldCount的Mapper和Reducer加起来要20多行吧。Spark仅需要:
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
甚至可以将它们放到一行。
通用性
Spark提供了All in One的解决方案!
(From the Project HomePage)
· Shark SQ:应用于即席查询(Ad-hocquery)
· Spark Streaming:应用于流式计算
· MLlib:应用于机器学习
· GraphX: 应用于图处理
Spark All In One的解决方案非常具有吸引力,毕竟任何公司都想要Unified的平台去处理遇到的问题,可以减少开发和维护的人力成本和部署平台的物力成本。
当然还有,作为All in One的解决方案,Spark并没有以牺牲性能为代价。相反,在性能方面,Spark还有很大的优势。
和Hadoop的集成
Spark可以使用YARN作为它的集群管理器,并且可以处理HDFS的数据。这对于已经部署Hadoop集群的用户特别重要,毕竟不需要做任何的数据迁移就可以使用Spark的强大处理能力。Spark可以读取HDFS,HBase, Cassandra等一切Hadoop的数据。
当然了对于没有部署并且没有计划部署Hadoop集群的用户来说,Spark仍然是一个非常好的解决方法,它还支持standalone, EC2 和 Mesos。你只要保证集群的节点可以访问共享的内容,比如通过NFS你就可以非常容易的使用Spark!
How Spark?
Spark是如何做到呢?或者说Spark的内核是如何实现的?
架构综述
(From the Project HomePage)
先说解释一下上图的术语:
Driver Program: 运行main函数并且新建SparkContext的程序。
SparkContext:Spark程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor。
Application: 基于Spark的用户程序,包含了driver程序和集群上的executor
Cluster Manager: 集群的资源管理器(例如: Standalone,Mesos,Yarn)
Worker Node: 集群中任何可以运行应用代码的节点
Executor: 是在一个worker node上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executors
Task: 被送到某个executor上的工作单元
了解了各个术语的含义后,我们看一下一个用户程序是如何从提交到最终到集群上执行的:
1. SparkContext连接到ClusterManager,并且向ClusterManager申请executors。
2. SparkContext向executors发送application code。
3. SparkContext向executors发送tasks,executor会执行被分配的tasks。
运行时的状态如下图:
(From Paper Resilient Distributed Datasets: A Fault-Tolerant Abstractionfor In-Memory Cluster Computing)
Spark为什么这么快?
首先看一下为什么MapReduce那么慢。速度可能是MapReduce最被人们诟病的地方。传统的MapReduce框架慢在哪里。
基于内存的计算式Spark速度很快的原因之一。Spark的运算模型也是它出色性能的重要保障。Spark的关键运算组件如下图。
什么是RDD
RDD是Spark的基石,也是Spark的灵魂。说Spark不得不提RDD,那么RDD(Resilient Distributed Dataset,弹性分布式数据集)是什么呢?当然了,论文Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing是了解RDD必不可少的,它从学术,实现给出了什么是RDD。下面是从RDD的实现源码的注释中说明了RDD的特性。
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
接着把对应的实现接口的源码贴一下,以方便去源码中查找RDD的核心框架:
- 分区 protected def getPartitions: Array[Partition]
- 依赖 protected def getDependencies: Seq[Dependency[_]] = deps
- 函数 def compute(split: Partition, context: TaskContext): Iterator[T]
- 最佳位置(可选) protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- 分区策略(可选) @transient val partitioner: Option[Partitioner] = None
RDD的操作
RDD支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。 例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。(不过还有一个并行的reduceByKey,能返回一个分布式数据集)
Spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。
默认情况下,每一个转换过的RDD都会在你在它之上执行一个动作时被重新计算。不过,你也可以使用persist(或者cache)方法,持久化一个RDD在内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你查询这个RDD时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的,详尽的RDD操作请参见 RDD API doc。
数据的本地性
数据本地性的意思就是尽量的避免数据在网络上的传输。Hadoop的MR之所以慢,频繁的读写HDFS是原因之一,为了解决这个问题,Spark将数据都放在了内存中(当然这是理想的情况,当内存不够用时数据仍然需要写到文件系统中)。但是如果数据需要在网络上传输,也会导致大量的延时和开销,毕竟disk IO和network IO都是集群的昂贵资源。
数据本地性是尽量将计算移到数据所在的节点上进行。毕竟移动计算要比移动数据所占的网络资源要少得多。而且,由于Spark的延时调度机制,使得Spark可以在更大的程度上去做优化。比如,拥有数据的节点当前正被其他的task占用,那么这种情况是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为如果预测当前节点结束当前任务的时间要比移动数据的时间还要少,那么调度会等待,直到当前节点可用。
Spark的现状与未来
值得庆祝的里程碑:
· 2009:Spark诞生于AMPLab
· 2010:开源
· 2013年6月:Apache孵化器项目
· 2014年2月:Apache*项目
· Hadoop最大的厂商Cloudera宣称加大Spark框架的投入来取代Mapreduce
· Hadoop厂商MapR投入Spark阵营
· Apache mahout放弃MapReduce,将使用Spark作为后续算子的计算平台
· 2014年5月30日Spark1.0.0发布
进一步学习
熟读源码永远是知道真相的唯一方式。尤其是Scala语言是如此简洁,如此易读。当然了在这之前最好还是读一下论文,尤其是RDD的,这样可以让你有个整体把握整个系统的能力。
- Shark: SQL and Rich Analytics at Scale. Reynold Xin, Joshua Rosen, Matei Zaharia, Michael J. Franklin, Scott Shenker, Ion Stoica. Technical Report UCB/EECS-2012-214. November 2012.
- Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters. Matei Zaharia, Tathagata Das, Haoyuan Li, Scott Shenker, Ion Stoica. HotCloud 2012. June 2012.
- Shark: Fast Data Analysis Using Coarse-grained Distributed Memory (demo). Cliff Engle, Antonio Lupher, Reynold Xin, Matei Zaharia, Haoyuan Li, Scott Shenker, Ion Stoica. SIGMOD 2012. May 2012. Best Demo Award.
- Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica. Technical Report UCB/EECS-2011-82. July 2011.
- Spark: Cluster Computing with Working Sets. Matei Zaharia, Mosharaf Chowdhury, Michael J. Franklin, Scott Shenker, Ion Stoica. HotCloud 2010. June 2010.
敬请期待
源码之前,了无真相。接下来,我将从源码分析的角度,深入Spark内部,来系统学习Spark,学习它的架构,学习它的实现。
请您支持:
如果你看到这里,相信这篇文章对您有所帮助。如果是的话,请为本文投一下票吧: 点击投票,多谢。