上一期中,通过翻译与凝练《Spark – The Definitive Guide》,我们初步解决了Spark是什么的问题,今天我们将继续学习Spark的基本架构,应用程序,结构化API,核心术语与概念,分享过程中若有错谬,欢迎拍砖。
Charpter 2.A Gentle Introduction to Spark
Spark’s Basic Architecture
一般而言,一台计算机就可以完成看电影,发邮件,制作电子表格等功能。但是诸如大数据处理这种富有挑战性的任务,单台机器往往是无法胜任的,因为其不具备强大的计算能力,丰富的计算资源,用户也没有足够的时间等待计算结束。这时候,就需要对一个集群或一组计算机资源进行整合,以便于我们能像使用单台计算机一样,使用这些资源。有了机器还不足以产生强大的算力,还需要一个软件框架来协调它们之间的工作。Spark由此应运而生,它负责管理和协调多台计算机的计算任务。其中,多台计算机先提交Spark应用程序给诸如Spark集群管理器,YARN,Mesos等管理器,而后Spark会根据一个集群管理器跟踪到的可用资源,将计算资源分配给应用程序。
Spark Applications
Spark Applications(应用程序)由一个driver process(驱动器进程)和一组executor process(执行器进程)组成。executor大部分时候都运行Spark代码,但driver可以通过多种不同语言调用Spark API 来进行驱动。
driver是Spark应用程序的核心。driver process负责运行main()函数,位于集群的一个节点上,它负责以下三件事:1、维护Spark应用程序的相关信息;2、响用户的程序或输入;3、分析任务并分发给若干executor process进行处理。
每个executor负责两件事:1、执行由driver分配给它的代码逻辑或计算任务;2、将executor的计算状态上报到运行driver的节点。
下图展示了集群管理器(Spark的独立集群管理器、 YARN或Mesos)如何控制物理机器,并为Spark应用程序分配资源。集群上可以运行多个Spark应用程序,集群有多个节点,用户可以配置每个节点上运行多少个执行器。
除集群模式外, Spark还有本地运行模式。驱动器和执行器只是简单的进程,这意味着它们可以位于同一台或不同的机器上。在本地模式下,驱动器和执行器在PC上运行(此时为线程,而不是进程)。
Spark’s Language APIs
Spark API支持多语言,用户可以使用多种编程语言运行Spark代码(Scala,Java,Python,SQL,R)。大多数时候, Spark在每种语言中都提供了一些“核心概念”,这些概念被转换成在集群上可运行的Spark代码。如果仅使用结构化API,则所有语言都应该具有相似的性能特征。
上图是一个简单的例子,说明了SparkSession与支持Spark API编程语言间的关系。对于每种语言的API,都含有前面描述的“核心概念”, SparkSession对象是Spark代码的入口,如果基于Python或R使用Spark,注意不要显式使用JVM指令,可以编写Python和R代码来调用SparkSession, Spark会将它们转换成可在JVM上运行的代码。
Spark’s APIs
尽管用户可以用各种语言启动Spark任务,但我们还是要知道Spark是如何让这些语言转换为可用的,这归功于 Spark的两套基本API:低级的非结构化 API和高级的结构化API。
Starting Spark
截至目前,已经介绍了Spark应用程序的基本概念,但这些概念还比较抽象,当我们真正开始编写Spark应用程序时,需要一种将用户命令和数据发送给Spark的方法,这就是SparkSession。按上一期博文介绍的方法,我们可以用spark-shell打开Scala控制台来启动一个交互式会话。
之后还会介绍一个向Spark提交独立预编译应用程序的命令spark-submit。
当以交互模式启动Spark时,相当于隐式创建了一个SparkSession来管理Spark应用程序。如果不是通过交互模式,而是通过独立应用程序启动Spark,则必须在程序中显式创建一个SparkSession对象。
SparkSession
用户可以用名为SparkSession的驱动器进程,来控制Spark应用程序。SparkSession实例是Spark通过集群执行用户定义操作的方式,Spark应用程序和SparkSession需要一一对应。当你用Scala或Python启动控制台时,SparkSession会被实例化为一个名为spark对象。
DataFrame
下面执行一个简单的任务。任务的内容为,创建一列包含1000行,值为0~999的分布式集合,在集群上运行此命令时,这个集合的每个部分都会被分配到不同的执行器上,这个集合就是一个Spark DataFrame。
Partitions
为了让多个executor并行工作, Spark将数据分解成多个数据块,每个数据块叫做一个分区。分区是位于集群中一台物理机上的多行数据集合, DataFrame的分区表示了在执行过程中,数据在集群中的物理分布。如果只有一个分区,即使拥有多个执行器, Spark也只有一个执行器在处理数据。同理,如果有多个分区,但只有一个执行器,Spark同样只有一个执行器在处理数据。当使用DataFrame时,大多数时候不需要手动操作分区,只需指定数据的高级转换操作。Spark会自动决定如何在集群上分配执行资源。低级别API也存在这种机制。
Transformations
Spark的核心数据结构在计算过程中保持不变,也意味着在创建后无法更改。为了“更改”一个DataFrame,你需要告诉Spark你想如何修改它,这个过程被称为转换。下面执行一个简单的转换来查找当前DataFrame中的所有偶数。以前面构建的myRange变量为例。
转换操作分为两类:窄依赖,宽依赖。
具有窄依赖关系(narrow dependency)的转换操作(窄转换),指的是每个输入分区仅决定一个输出分区的转换。上图的代码中, where语句指定了一个窄依赖关系,其中一个分区最多只会影响一个输出分区。
Lazy Evaluation
惰性机制的意思就是等到最后时刻才执行计算。如果用户需要对数据进行操作,Spark会先建立一系列作用到原始数据的转换计划,并将其编译为可以在集群中高效运行的流水线式的物理执行流程,然后等待到action时才开始执行代码。这带来的好处有,Spark可以优化整个从输入到输出端的数据流。比如DataFrame的predicate pushdown(谓词下推),假设我们构建了一个含有多个转换操作的Spark作业,并在最后指定了一个过滤操作,再假设这个过滤操作只需要输入数据的某一行数据,则最有效的方法为,从最开始就仅访问我们需要的那一条记录,Spark通过自动下推这个过滤操作实现了整个物理执行计划的优化。
Actions
转换操作使我们能够建立逻辑转换计划。为了触发计算,我们需要运行一个action操作。该操作指示Spark在一系列转换操作后计算出一个结果。最简单的action是count,用于计算一个DataFrame中的记录总数,仍然以上面的代码变量为例。
Spark UI
举个实际案例,用户启动了一个Spark作业,首先执行一次filter(窄转换),再执行一次aggregation(宽转换),然后到每个分区上执行count,最后通过collect将所有分区的结果汇集到一起,生成一个结果。你可以通过Spark UI看到所有这些操作。Spark UI是一个包含在Spark软件包中的工具,可以用它监视Spark集群上运行的Spark作业。Spark UI占用的驱动器节点端口是4040。如果是在本地模式运行,可以通过http://localhost:4040访问Spark Web UI。Spark UI上显示了Spark作业的运行状态、作业进度、执行环境和群集状态等信息,这些信息非常有用,可用于性能调优和代码调试。下图展示了一个例子,可以看出该作业包含了两个运行阶段,九个任务。