【核心API开发】Spark入门教程[3]

本教程源于2016年3月出版书籍《Spark原理、机制及应用》 ,在此以知识共享为初衷公开部分内容,如有兴趣,请支持正版书籍。

Spark综合了前人分布式数据处理架构和语言的优缺点,使用简洁、一致的函数式语言Scala作为主要开发语言,同时为了方便更多语言背景的人使用,还支持Java、Python和R语言。Spark因为其弹性分布式数据集(RDD)的抽象数据结构设计,通过实现抽象类RDD可以产生面对不同应用场景的子类。本章将先介绍Spark编程模型、RDD的相关概念、常用API源码及应用案例,然后具体介绍四大应用框架,为后续进一步学习Spark框架打下基础。

 

3.1 Spark 编程模型概述

Spark的编程模型如图3-1所示。

【核心API开发】Spark入门教程[3]

图3-1 Spark编程模型

开发人员在编写Spark应用的时候,需要提供一个包含main函数的驱动程序作为程序的入口,开发人员根据自己的需求,在main函数中调用Spark提供的数据操纵接口,利用集群对数据执行并行操作。

Spark为开发人员提供了两类抽象接口。第一类抽象接口是弹性分布式数据集(Resilient Distributed Dataset,下文简称RDD),顾名思义,RDD是对数据集的抽象封装,开发人员可以通过RDD提供的开发接口来访问和操纵数据集合,而无需了解数据的存储介质(内存或磁盘)、文件系统(本地文件系统、HDFS或Tachyon)、存储节点(本地或远程节点)等诸多实现细节;第二类抽象是共享变量(Shared Variables),通常情况下,一个应用程序在运行的时候会被划分成分布在不同执行器之上的多个任务,从而提高运算的速度,每个任务都会有一份独立的程序变量拷贝,彼此之间互不干扰,然而在某些情况下需要任务之间相互共享变量,Apache Spark提供了两类共享变量,它们分别是:广播变量(Broadcast Variable)和累加器(Accumulators)。第3.3节将介绍RDD的基本概念和RDD提供的编程接口,并在后面详细解读接口的源码实现,从而加深对RDD的理解,此外会在第3.4节中介绍两类共享变量的使用方法。

3.2 Spark Context

SparkContext是整个项目程序的入口,无论从本地读取文件(textfile方法)还是从HDFS读取文件或者通过集合并行化获得RDD,都先要创建SparkContext对象,然后使用SparkContext对RDD进行创建和后续的转换操作。本节主要介绍SparkContext类的作用和创建过程,然后通过一个简单的例子向读者介绍SparkContext的应用方法,从应用角度来理解其作用。

3.2.1  SparkContext的作用

SparkContext除了是Spark的主要入口,它也可以看作是对用户的接口,它代表与Spark集群的连接对象,由图3-2可以看到,SparkContext主要存在于Driver Program中。可以使用SparkContext来创建集群中的RDD、累积量和广播量,在后台SparkContext还能发送任务给集群管理器。每一个JVM只能有运行一个程序,即对应只有一个SparkContext处于激活状态,因此在创建新的SparkContext前需要把旧的SparkContext停止。

【核心API开发】Spark入门教程[3]

图3-2  SparkContext在Spark架构图中的位置

3.2.2 SparkContext创建

SparkContext的创建过程首先要加载配置文件,然后创建SparkEnv、TaskScheduler和DAGScheduler,具体过程和源码分析如下。

1.加载配置文件SparkConf

SparkConf在初始化时,需先选择相关的配置參数,包含master、appName、sparkHome、jars、environment等信息,然后通过构造方法传递给SparkContext,这里的构造函数有多种表达形式,当SparkContex获取了全部相关的本地配置信息后开始下一步操作。

def this(master: String, appName: String, conf: SparkConf) =
    this(SparkContext.updatedConf(conf, master, appName))
def this(
  master: String,
  appName: String,
  sparkHome: String = null,
  jars: Seq[String] = Nil,
  environment: Map[String, String] = Map(),
  preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
  {
    this(SparkContext.updatedConf(newSparkConf(),master,appName,sparkHome,jars,environment))
    this.preferredNodeLocationData = preferredNodeLocationData
  }  
2.创建SparkEnv
  创建SparkConf后就需要创建SparkEnv,这里面包括了很多Spark执行时的重要组件,包括 MapOutputTracker、ShuffleFetcher、BlockManager等,在这里源码是通过SparkEnv类的伴生对象SparkEnv Object内的createDriverEnv方法实现的。
private[spark] defcreateDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      mockOutputCommitCoordinator:Option[OutputCommitCoordinator] = None): SparkEnv = {
   assert(conf.contains("spark.driver.host"),"spark.driver.host is not set on the driver!")
    assert(conf.contains("spark.driver.port"),"spark.driver.port is not set on the driver!")
    val hostname =conf.get("spark.driver.host")
    val port =conf.get("spark.driver.port").toInt
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      hostname,
      port,
      isDriver = true,
      isLocal = isLocal,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator =mockOutputCommitCoordinator
  )
}  
<p style="background:#F3F3F3;">
</p>  

3.创建TaskScheduler

创建SparkEnv后,就需要创建SparkContext中调度执行方面的变量TaskScheduler。

private[spark] var (schedulerBackend, taskScheduler) =
   SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler", e)
      }
    }
  }  

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
  // constructor
  taskScheduler.start()  

TaskScheduler是依据Spark的执行模式进行初始化的,详细代码在SparkContext中的createTaskScheduler方法中。在这里以Standalone模式为例,它会将sc传递给TaskSchedulerImpl,然后创建SparkDeploySchedulerBackend并初始化,最后返回Scheduler对象。

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)  

4.创建DAGScheduler

创建TaskScheduler对象后,再将TaskScheduler对象传至DAGScheduler,用来创建DAGScheduler对象。

@volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler", e)
      }
    }
  }
 def this(sc: SparkContext) = this(sc, sc.taskScheduler)

创建DAGScheduler后再调用其start()方法将其启动。以上4点是整个SparkContext的创建过程,这其中包含了很多重要的步骤,从这个过程能理解Spark的初始启动情况。

3.2.3 使用shell

除了单独编写一个应用程序的方式之外,Spark还提供了一个交互式Shell来使用。在Shell中,用户的每条语句都能在输入完毕后及时得到结果,而无需手动编译和运行程序。Shell的使用十分简单,改变当前工作路径到Spark的安装目录,执行指令$ ./bin/spark-shell即可进入Shell。
在Shell中,系统根据命令提供的参数自动配置和生成了一个SparkContext对象sc,直接使用即可,无需再手动实例化SparkContext。除了结果会实时显示之外,其余操作与编写单独应用程序类似。读者可直接参考Spark官方提供的Spark ProgrammingGuide等文档,在此不做具体介绍。

3.2.4 应用实践
 这里向读者介绍一段用于统计文件中字母a和字母b出现频率的Spark应用,通过这个程序向读者展示SparkContext的用法。

【例3-1】简单的Spark程序

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md"               // 本地文件目录
    val conf = new SparkConf().setAppName("Simple Application")       //给Application命名
    val sc = new SparkContext(conf)                                //创建SparkContext
    val logData = sc.textFile(logFile, 2).cache()                      //缓存文件
    val numAs = logData.filter(line => line.contains("a")).count()         //计算字母a的个数
    val numBs = logData.filter(line => line.contains("b")).count()         //计算字母b的个数
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))   //打印结果
  }  

这个例子中,首先创建本地文件目录logFile和配置文件conf,然后使用配置信息conf实例化SparkContext得到sc,得到sc之后就可以从本地文件中读取数据并把数据转化成RDD,并命名为logData,然后logData调用filter方法分别计算包含字母a的行数和包含字母b的行数,最后打印出结果。该例子中使用了SparkContext的实例化对象创建RDD数据集。

3.3 RDD简介

本节主要介绍弹性分布式数据集RDD的相关概念,其中包括RDD创建来源、两种重要的Transformation和Action操作、数据持久化和检查点机制,通过对Spark中RDD核心抽象的深入理解,能帮助读者全面理解后面的RDD的分区、并行计算和依赖等机制和变换过程。

3.3.1 RDD创建

RDD是Spark应用程序开发过程中最为基本也最为重要的一类数据结构,RDD被定义为只读、分区化的记录集合,更为通俗来讲,RDD是对原始数据的进一步封装,封装导致两个结果:第一个结果是数据访问权限被限制,数据只能被读,而无法被修改;第二个结果是数据操作功能被强化,使得数据能够实现分布式存储、并发处理、自动容错等等诸多功能。Spark的整个计算过程都是围绕数据集RDD来进行,下面将会对RDD的创建以及数据结构进行简单介绍。

1.RDD的两类来源

1)将未被封装的原始数据进行封装操作得到,根据原始数据的存在形式,又可被进一步分成由集合并行化获得或从外部数据集中获得。

2)由其他RDD通过转换操作获得,由于RDD的只读特性,内部的数据无法被修改,因此RDD内部提供了一系列数据转换(Transformation)操作接口,这类接口可返回新的RDD,而不影响原来的RDD内容。在后面第3章3.3节中将会对RDD的创建方法进行更加详尽的说明。

2.RDD内部数据结构

1)分区信息的列表

2)对父RDD的依赖信息

3)对Key-Value键值对数据类型的分区器(可选)

4)计算分区的函数

5)每个数据分区的物理地址列表(可选)

RDD的数据操作并非在调用内部接口的一刻便开始计算,而是遇到要求将数据返回给驱动程序,或者写入到文件的接口时,才会进行真正的计算,这类会触发计算的操作称为动作(Action)操作,而这种延时计算的特性,被称为RDD计算的惰性(Lazy),在第六章机篇将分别讲述动作操作和惰性特征。

在第1章中说过,Spark是一套内存计算框架,其能够将频繁使用的中间数据存储在内存当中,数据被使用的频率越高,性能提升越明显。数据的内存化操作在RDD层次上,体现为RDD的持久化操作,在3.3.4节描述RDD的持久化操作。除此之外,RDD还提供了类似于持久化操作的检查点机制,表面看上去与存储在HDFS的持久化操作类似,实际使用上又有诸多不同,在3.3.5小节描述RDD的检查点机制。

3.3.2 RDD转换操作

转换(Transformation)操作是由一个RDD转换到另一个新的RDD,例如,map操作在RDD中是一个转换操作,map转换会让RDD中的每一个数据都通过一个指定函数得到一个新的RDD。

RDD内部可以封装任意类型的数据,但某些操作只能应用在封装键值对类型数据的RDD之上,例如转换操作reduceByKey、groupByKey和countByKey等。

表3-1展示了RDD所提供的所有转换操作及其含义。

表3-1:RDD提供的转换操作

Transformation

算子作用

map(func)

新RDD中的数据由原RDD中的每个数据通过函数func得到

filter(func)

新RDD种的数据由原RDD中每个能使函数func返回true值的数据组成

flatMap(func)

类似于map转换,但func的返回值是一个Seq对象,Seq中的元素个数可以是0或者多个

mapPartitions(func)

类似于map转换,但func的输入不是一个数据项,则是一个分区,若RDD内数据类型为T,则func必须是Iterator<T> => Iterator<U>类型

mapPartitionsWithIndex(func)

类似于mapPartitions转换,但func的数据还多了一个分区索引,即func类型是(Int, Iterator<T> => Iterator<U>)

sample(withReplacement, fraction, seed)

对fraction中的数据进行采样,可以选择是否要进行替换,需要提供一个随机数种子

union(otherDataset)

新RDD中数据是原RDD与RDD otherDataset中数据的并集

Intersection(otherDataset)

新RDD中数据是原RDD与RDD otherDataset中数据的交集

distinct([numTasks])

新RDD中数据是原RDD中数据去重的结果

groupByKey([numTasks])

原RDD中数据类型为(K, V)对,新RDD中数据类型为(K, Iterator(V))对,即将相同K的所有V放到一个迭代器中

reduceByKey(func, [numTasks])

原RDD和新RDD数据的类型都为(K, V)对,让原RDD相同K的所有V依次经过函数func,得到的最终值作为K的V

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

原RDD数据的类型为(K, V),新RDD数据的类型为(K, U),类似于groupbyKey函数,但聚合函数由用户指定。键值对的值的类型可以与原RDD不同

sortByKey([ascending], [numTasks])

原RDD和新RDD数据的类型为(K, V)键值对,新RDD的数据根据ascending的指定顺序或者逆序排序

join(otherDataset, [numTasks])

原RDD数据的类型为(K, V),otherDataset数据的类型为(K, W),对于相同的K,返回所有的(K, (V, W))

cogroup(otherDataset, [numTasks])

原RDD数据的类型为(K, V),otherDataset数据的类型为(K, W),对于相同的K,返回所有的(K, Iterator<V>, Iterator<W>)

catesian(otherDataset)

原RDD数据的类型为为T,otherDataset数据的类型为U,返回所有的(T, U)

pipe(command, [envValue])

令原RDD中的每个数据以管道的方式依次通过命令command,返回得到的标准输出

coalesce(numPartitions)

减少原RDD中分区的数目至指定值numPartitions

repartition(numPartitions)

修改原RDD中分区的数目至指定值numPartitions

3.3.3 RDD动作操作

相对于转换,动作(Action)操作用于向驱动(Driver)程序返回值或者将值写入到文件当中。例如reduce动作会使用同一个指定函数让RDD中的所有数据做一次聚合,把运算的结果返回。表3-2展示了RDD所提供的所有动作操作及其含义。

表3-2:RDD提供的动作操作

Action

算子作用

reduce(func)

令原RDD中的每个值依次经过函数func,func的类型为(T, T) => T,返回最终结果

collect()

将原RDD中的数据打包成数组并返回

count()

返回原RDD中数据的个数

first()

返回原RDD中的第一个数据项

take(n)

返回原RDD中前n个数据项,返回结果为数组

takeSample(withReplacement, num, [seed])

对原RDD中的数据进行采样,返回num个数据项

saveAsTextFile(path)

将原RDD中的数据写入到文本文件当中

saveAsSequenceFile(path)(Java and Scala)

将原RDD中的数据写入到序列文件当中

savaAsObjectFile(path)(Java and Scala)

将原RDD中的数据序列化并写入到文件当中。可以通过SparkContext.objectFile()方法加载

countByKey()

原RDD数据的类型为(K, V),返回hashMap(K, Int),用于统计K出现的次数

foreach(func)

对于原RDD中的每个数据执行函数func,返回数组

3.3.4 惰性计算

需要注意的是,一个RDD执行转换操作之后,数据的计算是延迟的,新生成的RDD会记录转换的相关信息,包括父RDD的编号、用户指定函数等等,但并不会立即执行计算操作,真正的计算操作过程得等到遇到一个动作操作(Action)才会执行,此外,除非用户指定持久化操作,否则转换过程中产生的中间数据在计算完毕后会被丢弃,即数据是非持久化。即使对同一个RDD执行相同的转换操作,数据同样会被重新计算。

Spark采取惰性计算机制有其道理所在。例如可以实现通过map方法创建的一个新数据集,然后使用reduce方法,最终只返回 reduce 的结果给driver,而不是整个大的新数据集。

3.3.5 RDD持久化

惰性计算的缺陷也是明显的:中间数据默认不会保存,每次动作操作都会对数据重复计算,某些计算量比较大的操作可能会影响到系统的运算效率,因此Spark允许将转换过程中手动将某些会被频繁使用的RDD执行持久化操作,持久化后的数据可以被存储在内存、磁盘或者Tachyon当中,这将使得后续的动作(Actions)变得更加迅速(通常快10倍)。

通过调用RDD提供的cache或persist函数即可实现数据的持久化,persist函数需要指定存储级别(StorageLevel),cache等价于采用默认存储级别的persist函数,Spark提供的存储级别及其含义如表3-3所示。在6.4节会继续讨论RDD持久化过程在源码级别上的实现细节。

表3-3  RDD的存储级别

存储级别

含义

MEMORY_ONLY

把RDD以非序列化状态存储在内存中,如果内存空间不够,则有些分区数据会在需要的时候进行计算得到

MEMORY_AND_DISK

把RDD以非序列化存储在内存中,如果内存空间不够,则存储在硬盘中

MEMORY_ONLY_SER

把RDD以Java对象序列化储存在内存中,序列化后占用空间更小,尤其当使用快速序列化库(如Kyro[1])时效果更好。缺点是读数据要反序列化,会消耗CPU计算资源

MEMORY_AND_DISK_SER

类似MEMORY_ONLY_SER,区别是当内存不够的时候会把RDD持久化到磁盘中,而不是在需要它们的时候实时计算

DISK_ONLY

只把RDD存储到磁盘中

MEMORY_ONLY_2,

类似MEMORY_ONLY,不同的是会复制一个副本到另一个集群节点

MEMORY_AND_DISK_2, etc.

类似MEMORY_AND_DISK,不同的是会复制一个副本到另一个集群节点

OFF_HEAP

把RDD以序列化形式存储在Tachyon中,与MEMORY_ONLY_SER不同的是,使用OFF-HEAP模式会减少垃圾回收的开销,此外还能让执行器共享内存,这种模式更适应于多并发和对内存要求高的环境

 
3.3.6 RDD检查点

因为DAG中血统(lineage)如果太长,当重计算的时候开销会很大,故使用检查点机制,将计算过程持久化到磁盘,这样如果出现计算故障的时候就可以在检查点开始重计算,而不需要从头开始。RDD的检查点(Checkpoint)机制类似持久化机制中的persist(StorageLevel.DISK_ONLY),数据会被存储在磁盘当中,两者最大的区别在于:持久化机制所存储的数据,在驱动程序运行结束之后会被自动清除;检查点机制则会将数据永久存储在磁盘当中,如果不手动删除,数据会一直存在。换句话说,检查点机制存储的数据能够被下一次运行的应用程序所使用。

检查点的使用与持久化类似,调用RDD的checkpoint方法即可。在6.4小节中继续介绍检查点机制的实现以及其与持久化过程的区别。

3.4 共享变量

因为在tasks之间读写共享变量会很低效,spark提供两种类型的共享变量类型,即broadcast variables和accumulators。

3.4.1 广播变量
        广播变量(Broadcast variables)允许用户将一个只读变量缓存到每一台机器之上,而不像传统变量一样,拷贝到每一个任务当中,同一台机器上的不同任务可以共享该变量值。如例3-1代码所示,对于变量v,只需要调用SparkContext.broadcast(v)即可得到变量v的广播变量broadcastVar,通过调用broadcastVar的value方法即可取得变量值。

【例3-1】广播变量的用法

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar:spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)  
 
3.4.2 累加器

累加器(Accumulators)是另外一种共享变量。累加器变量只能执行加法操作,但其支持并行操作,这意味着不同任务多次对累加器执行加法操作后,加法器最后的值等于所有累加的和。累加器的值只能被驱动程序访问,集群中的任务无法访问该值。

【例3-2】累加器的用法  

scala> val accum = sc.accumulator(0, "My Accumulator")
scala> accum.value()     //(通过这种方法进行读取原始变量值)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
res2:Int = 10
3.5 Spark核心开发实践

本节主要介绍核心开发中RDD的两个主要操作算子Transformation和Action的使用方法,由于Spark是基于延迟计算,Transforamation算子并不立即执行,这时只是保存计算状态,当Action算子出现才真正执行计算。为此下面就这两个算子分别学习主要的API方法和应用实例,如果想了解更多关于RDD的API操作,建议读者参考拉筹伯大学教授的个人主页http://homepage.cs.latrobe.edu.au/zhe/。

3.5.1 单值型Tranformation算子

单值型的算子就是输入为单个值形式,这里主要介绍map、flatMap、mapPartitions、union、cartesian、groupBy、filter、distinct、subtract、foreach、cache、persist、sample以及takeSample方法,如表3-4列出各方法的简要概述。

表3-4  单值型Transformation算子

方法名

方法定义

map

def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]

flatMap

defmapPartitions[U](f: (Iterator[T])⇒ Iterator[U], preservesPartitioning: Boolean = false)

mapPartition

def mapPartitions[U](f: (Iterator[T])⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

mapPartitionsWith

Index

def mapPartitionsWithIndex[U](f: (Int, Iterator[T])⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

foreach

def foreach(f: (T) ⇒ Unit): Unit

foreachPartition

def foreachPartition(f: (Iterator[T])⇒ Unit): Unit

glom

def glom(): RDD[Array[T]]

union

def union(other: RDD[T]): RDD[T]

cartesian

def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

groupBy

def groupBy[K](f: (T) ⇒ K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]

filter

def filter(f: (T) ⇒ Boolean): RDD[T]

distinct

def distinct(): RDD[T]

subtract

def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

cache

def cache(): RDD.this.type

persist

def persist(): RDD.this.type

sample

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

takeSample

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

1.map

对原来每一个输入的RDD数据集进行函数转换,返回的结果为新的RDD,该方法对分区操作是一对一的。

方法源码实现:

def map[U: ClassTag](f: T =>U): RDD[U] = new MappedRDD(this, sc.clean(f))

【例3-3】map方法应用样例

val a = sc.parallelize(List("bit", "linc", "xwc", "fjg", "wc","spark"), 3)  //创建RDD
val b = a.map(word => word.length)       //计算每个单词的长度
val c = a.zip(b)        //拉链方法,把两列数据对应配对成键值对格式
c.collect       //把结果转换为数组
res0: Array[(String, Int)] = Array((bit,3), (linc,4), (xwc,3), (fjg,3), (wc,2),(spark,5))

这个例子中map方法从a中依次读入一个单词,然后计算单词长度,把最后计算的长度赋给b,然后因为a和b的长度相同,使用zip方法将a、b中对应元素组成K-V键值对形式,最后使用Action算子中的collect方法把键值对以数组形式输出。

【核心API开发】Spark入门教程[3]

图3-3  map方法应用样例

2.flatMap

flapMap方法与map方法类似,但是允许在一次map方法中输出多个对象,而不是map中的一个对象经过函数转换生成另一个对象。

方法源码实现:

def flatMap[U: ClassTag](f: T=> TraversableOnce[U]): RDD[U] =new FlatMappedRDD(this, sc.clean(f))

【例3-4】flatMap方法应用样例

val a = sc.parallelize(1 to 10, 5)      //生成从1到10的序列,5个分区
a.flatMap(num => 1 to num).collect       //方法的作用是把每一个num映射到从1到num的序列
res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  

这个例子先得到从1到10的序列,然后调用flatMap方法对输入num依次生成从1到num的序列,最后使用collect方法转换成数组输出。

3.mapPartitions

mapPartitions是map的另一个实现。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是作用于每个分区,也就是把每个分区中的内容作为整体来处理的。

方法源码实现:

def mapPartitions[U:ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean =false): RDD[U] = {

    val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(iter)

    new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)

       }

【例3-5】mapPartitions方法应用样例

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
    var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext) {
        val cur = iter.next
        res .::= (pre, cur)
        pre = cur
    }
    res.iterator
}
scala> a.mapPartitions(myfunc).collect
res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))  

如图3-4,这个例子是先得到从1到9的序列,因为有3个分区,所以每个分区数据分别是(1,2,3),(4,5,6)和(7,8,9),然后调用mapPartitions方法,因为scala是函数式编程,函数能作为参数值,所以mapPartition方法输入参数是myfunc函数。myfunc函数的作用是先构造一个空list集合,输入单元素集合iter,输出双元素Tuple集合,把分区中一个元素和它的下一个元素组成一个Tuple。因为每个分区中最后一个元素没有下一个元素,所以(3,4)和(6,7)不在结果中。

mapPartitions还有其他的类似实现,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数,此外还有mapPartitionsWithIndex,它能把分区中的index信息传递给用户指定的输入函数,这些其他类似的实现都是基于map方法,只是细节不同,这样做更方面使用者在不同场景下的应用。【核心API开发】Spark入门教程[3]

图3-4  mapPartitions方法应用样例

4.mapPartitionWithIndex

mapPartitionWithIndex方法与mapPartitions方法功能类似,不同的是mapPartition-WithIndex还会对原始分区的索引进行追踪,这样能知道分区所对应的元素,方法的参数为一个函数,函数的输入为整型索引和迭代器。

方法源码实现:

def mapPartitionsWithIndex[U:ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning:Boolean = false): RDD[U] = {

    val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(index, iter)

    new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)

}

【例3-6】mapPartitionWithIndex方法应用样例

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
  iter.toList.map(x => index + "," + x).iterator
}
x.mapPartitionsWithIndex(myfunc).collect()
res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)  

这个例子中先得到一个名为的x序列,然后调用mapPartitionsWithIndex方法,参数为myfunc函数,这个函数的实现是把输入经过map方法映射为分区索引加值的形式。结果中的0,1表示分区下标0和第一个输入值1,后面依次输出其他分区和对应的值,说明分区数是从下标0开始的。

5.foreach

foreach方法主要是对输入的数据对象执行循环操作,该方法常用来输出RDD中的内容。

方法源码实现:

def foreach(f: T => Unit) {

    val cleanF = sc.clean(f)

    sc.runJob(this, (iter: Iterator[T]) =>iter.foreach(cleanF))

}

【例3-7】foreach方法应用样例

val c = sc.parallelize(List("xwc", "fjg", "wc", "dcp", "zq", "snn", "mk", "zl", "hk", "lp"), 3)
c.foreach(x => println(x + " are from BIT"))
xwc are from BIT
fjg are from BIT
wc are from BIT
dcp are from BIT
zq are from BIT
ssn are from BIT
mk are from BIT
zl are from BIT
hk are from BIT
lp are from BIT  

这个方法比较直观,直接对c变量中的每一个元素对象使用println函数,打印对象内容。

6.foreachPartition

foreachPartition方法的作用是通过迭代器参数对RDD中每一个分区的数据对象应用函数。mapPartitions方法的作用于foreachPartition方法作用非常相似,区别就在于使用的参数是否有返回值。

方法源码实现:

def foreachPartition(f:Iterator[T] => Unit) {

    val cleanF = sc.clean(f)

    sc.runJob(this, (iter: Iterator[T]) =>cleanF(iter))

}

【例3-8】foreachPartition方法应用样例

val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
 b.foreachPartition(x => println((a,b) => x.reduce(a + b)))
 6
15
24  

这个例子是将序列b中的每一个元素进行reduce操作,对每个分区中输入的每一个元素累加,例如对于分区0,输入1和2相加等于3,然后把上个结果3与下一个输入3相加就等于6,其他分区的运算与该分区一样。

7.glom

作用类似collect,但它不知直接将所有RDD直接转化为数组形式,glom方法的作用是将RDD中分区数据进行组装到数组类型RDD中,每一个返回的数组包含一个分区的元素,按分区转化为数组,最后有几个分区就返回几个数组类型的RDD。

方法源码实现:

def glom(): RDD[Array[T]] = newGlommedRDD(this)

private[spark] classGlommedRDD[T: ClassTag](prev: RDD[T])extends RDD[Array[T]](prev) {

    overridedef getPartitions: Array[Partition] = firstParent[T].partitions

    overridedef compute(split: Partition, context: TaskContext) =

    Array(firstParent[T].iterator(split,context).toArray).iterator

}

【例3-9】glom方法应用样例

val a = sc.parallelize(1 to 99, 3)
a.glom.collect
res5: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99))  

这个例子很简洁,在执行glom方法后就调用collect方法获得Array数组并输出,可以看出a.glom方法输出的是三个数组组成的RDD,其中每个数组代表一个分区数据。

8.union

union方法(等价于“++”)是将两个RDD取并集,取并集过程中不会把相同元素去掉。union操作是输入分区与输出分区多对一模式,如图所示。

方法源码实现:

def union(other: RDD[T]):RDD[T] = new UnionRDD(sc, Array(this, other))

class UnionRDD[T: ClassTag](

    sc: SparkContext,

    var rdds: Seq[RDD[T]])

  extends RDD[T](sc, Nil) {

       overridedef getPartitions: Array[Partition] = {

          val array = newArray[Partition](rdds.map(_.partitions.size).sum)

          var pos = 0

          for ((rdd, rddIndex) <- rdds.zipWithIndex;split <- rdd.partitions) {

             array(pos) = new UnionPartition(pos,rdd, rddIndex, split.index)

             pos += 1

          }

          array

      }

     overridedef getDependencies: Seq[Dependency[_]] = {

         valdeps = new ArrayBuffer[Dependency[_]]

         varpos = 0

         for(rdd <- rdds) {

            deps += new RangeDependency(rdd, 0, pos,rdd.partitions.size)

            pos += rdd.partitions.size

         }

         deps

    }

    override def compute(s: Partition, context:TaskContext): Iterator[T] = {

        valpart = s.asInstanceOf[UnionPartition[T]]

       parent[T](part.parentRddIndex).iterator(part.parentPartition,context)

    }

    overridedef getPreferredLocations(s: Partition): Seq[String] =

   s.asInstanceOf[UnionPartition[T]].preferredLocations()

    overridedef clearDependencies() {

       super.clearDependencies()

       rdds= null

    }

}

【例3-10】union方法应用样例

val a = sc.parallelize(1 to 4, 2)
val b = sc.parallelize(2 to 4, 1)
(a ++ b).collect
res4: Array[Int] = Array(1, 2, 3, 4, 2, 3, 4)  

如图3-5可见,这个例子先创建2个RDD变量a和b,然后对a与b使用union方法,返回两个RDD并集的结果。

【核心API开发】Spark入门教程[3]

图3-5  union方法应用样例

9.cartesian

计算两个RDD中每个对象的笛卡尔积(例如第一个RDD中的每一个对象与第二个RDD中的对象join连接),但使用该方法时要注意可能出现内存不够的情况。

方法源码实现:

def cartesian[U:ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)

class CartesianRDD[T: ClassTag,U: ClassTag](

    sc: SparkContext,

    var rdd1 : RDD[T],

    var rdd2 : RDD[U])

  extends RDD[Pair[T, U]](sc, Nil)

  with Serializable {

      valnumPartitionsInRdd2 = rdd2.partitions.size

       overridedef getPartitions: Array[Partition] = {

       //create the cross product split

         val array = newArray[Partition](rdd1.partitions.size * rdd2.partitions.size)

         for(s1 <- rdd1.partitions; s2 <- rdd2.partitions) {

            val idx = s1.index * numPartitionsInRdd2+ s2.index

            array(idx) = new CartesianPartition(idx,rdd1, rdd2, s1.index, s2.index)

         }

         array

      }

     overridedef getPreferredLocations(split: Partition): Seq[String] = {

        valcurrSplit = split.asInstanceOf[CartesianPartition]

        (rdd1.preferredLocations(currSplit.s1)++ rdd2.preferredLocations(currSplit.s2)).distinct

     }

    overridedef compute(split: Partition, context: TaskContext) = {

       valcurrSplit = split.asInstanceOf[CartesianPartition]

       for (x <- rdd1.iterator(currSplit.s1,context);

       y <- rdd2.iterator(currSplit.s2,context)) yield (x, y)

    }

    overridedef getDependencies: Seq[Dependency[_]] = List(new NarrowDependency(rdd1) {

          def getParents(id: Int): Seq[Int] = List(id/ numPartitionsInRdd2)

    },

    new NarrowDependency(rdd2) {

      def getParents(id: Int): Seq[Int] =List(id % numPartitionsInRdd2)

    }

)

  override def clearDependencies() {

    super.clearDependencies()

    rdd1 = null

    rdd2 = null

  }

}

【例3-11】cartesian方法应用样例

val x =sc.parallelize(List(1,2,3),1)
val y =sc.parallelize(List(4,5),1)
x.cartesian(y).collect
res0: Array[(Int, Int)] =Array((1,4),(1,5),(2,4),(2,5),(3,4),(3,5))  

例子中x是第一个RDD,其中的每个元素都跟y中元素进行连接,如果第一个RDD有m个元素,第二个RDD中元素n个,则求笛卡尔积后总元素为m×n个,本例结果为6个,如图3-6所示。

【核心API开发】Spark入门教程[3]

图3-6  cartesian方法应用样例

10.groupBy

groupBy方法有三个重载方法,功能是将元素通过map函数生成Key-Value格式,然后使用reduceByKey方法对Key-Value对进行聚合,具体可参考源码实现。

方法源码实现:

def groupBy[K](f: T => K, p:Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)

      : RDD[(K, Iterable[T])] = {

   valcleanF = sc.clean(f)                      //对用户函数预处理

   this.map(t=> (cleanF(t), t)).groupByKey(p)  //对数据进行map操作,生成Key-Value对,再聚合

}

def groupBy[K](f: T =>K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =

    groupBy[K](f, defaultPartitioner(this))                //使用默认分区器

def groupBy[K](f: T => K,numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =

    groupBy(f, new HashPartitioner(numPartitions)) //使用hash分区器,分区数自定义

【例3-12】groupBy方法应用样例

(1)val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7,          9)))
(2)val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2
}
a.groupBy(myfunc).collect
res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))
(3)val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2
}
a.groupBy(myfunc(_), 1).collect
res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))

第一个例子中是单个参数,调用groupBy方法,结果集的key只有两种,即even和odd,然后对相同的key进行聚合得到最终结果。第二个例子和第三个例子本质一样,只是使用的重载方法不同。

11.filter

filter方法通过名称就能猜出来功能,其实就是对输入元素进行过滤,参数是一个返回值为boolean的函数,如果函数对输入元素运算结果为true,则通过该元素,否则将该元素过滤,不能进入结果集。

方法源码实现:

def filter(f: T => Boolean):RDD[T] = new FilteredRDD(this, sc.clean(f))

【例3-13】filter方法应用样例

(1)val a = sc.parallelize(1 to 10, 3)
val b = a.filter(x => x % 3 == 0)
b.collect
res3: Array[Int] = Array(3, 6, 9)
(2)val b = sc.parallelize(1 to 8)
b.filter(x => x < 4).collect
res15: Array[Int] = Array(1, 2, 3)
(3)val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog"))
a.filter(_ < 4).collect
<console>:15: error: value < is not a member of Any  

第一个和第二个例子比较好理解,因为a中元素都是整型,可以顺利进行比较,但第三个例子会报错,因为a中有部分对象不能与整数比较,如果使用scala中的偏函数就可以解决混合数据类型的问题。

12.distinct

将RDD中重复的元素去掉,只留下唯一的RDD元素。

方法源码实现:

def distinct(): RDD[T] =distinct(partitions.size)

def distinct(numPartitions:Int)(implicit ord: Ordering[T] = null): RDD[T] =

map(x => (x,null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

【例3-14】distinct方法应用样例

(1)val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)
(2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length
res16: Int = 2
(3)a.distinct(3).partitions.length
res17: Int = 3  

这个例子就是把RDD中的元素map为Key-Value对形式,然后使用reduceByKey将重复Key合并,也就是把重复元素删除,只留下唯一的元素。此外distinct有一个重载方法需要一个参数,这个参数就是分区数numPartitions,从例子中看出使用带参的distinct方法不仅能删除重复元素,而且还能对结果重新分区。

13.subtract

subtract的含义就是求集合A-B的差,即把集合A中包含集合B的元素都删除,结果是剩下的元素。

方法源码实现:

def subtract(other: RDD[T], p:Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {

    if (partitioner == Some(p)) {

        val p2 = newPartitioner() {

           override def numPartitions = p.numPartitions

           overridedef getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)

        }

       // Unfortunately, since we're making a new p2,we'll get ShuffleDependencies

       //anyway, and when calling .keys, will not have a partitioner set, even though

       // the SubtractedRDD will, thanks to p2'sde-tupled partitioning, already be

       //partitioned by the right/real keys (e.g. p).

       this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p2).keys

      } else {

         this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p).keys

     }

}

【例3-15】subtract方法应用样例

val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect
res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)  

这个例子就是把a中包含b中的元素都删除掉,底层实现使用subtractByKey,也就是根据键值对中的Key来删除a包含的b元素。

14.persist,cache

cache方法顾名思义,是缓存数据,其作用是把RDD缓存到内存中,以方便下一次计算被再次调用。

方法源码实现:

 def cache(): this.type = persist()

【例3-16】cache方法应用样例

val c = sc.parallelize(List("a", "b", "c", "d", "e", "f"),1)
c.cache
res11: c.type = ParallelCollectionRDD[10] at parallelize at <console>:21  

这个例子就是直接把RDD缓存在内存中。

15.persist

persist方法的作用是把RDD根据不同的级别进行持久化,通过使用带参数方法能指定持久化级别,如果不带参数则为默认持久化级别,即只保存到内存,与cache等价。

【例3-17】persist方法应用样例

val a = sc.parallelize(1 to 9, 3)
a.persist(StorageLevel.MEMORY_ONLY)  

这个例子使用persist方法,指定持久化级别为MEMORY_ONLY,该级别等价于cache方法。

16.sample

sample的作用是随机的对RDD中的元素采样,获得一个新的子集RDD,根据参数能指定是否又放回采样、子集占总数的百分比和随机种子。

方法源码实现:

def sample(withReplacement:Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {

    require(fraction >= 0.0, "Negativefraction value: " + fraction)

    if (withReplacement) {

      new PartitionwiseSampledRDD[T, T](this,new PoissonSampler[T](fraction), true, seed)

    } else {

      new PartitionwiseSampledRDD[T, T](this,new BernoulliSampler[T](fraction), true, seed)

    }

 }

【例3-18】sample方法应用样例

(1)val a = sc.parallelize(1 to 1000, 2)
a.sample(false, 0.1, 0).collect
res4: Array[Int] = Array(3, 21, 22, 27, 48, 50, 57, 80, 88, 90, 97, 113, 126, 130, 135, 145, 162, 169,        182, 230, 237, 242, 267, 271, 287, 294, 302, 305, 324, 326, 330, 351, 361, 378, 383, 384, 409, 412, 418,        432, 433, 485, 493, 497, 502, 512, 514, 521, 522, 531, 536, 573, 585, 595, 615, 617, 629, 640, 642, 647,    651, 664, 671, 673, 684, 692, 707, 716, 718, 721, 723, 736, 738, 756, 759, 788, 799, 827, 828, 833, 872,  898, 899, 904, 915, 916, 919, 927, 929, 951, 969, 980)
(2)val a = sc.parallelize(1 to 100, 2)
a.sample(true, 0.3, 0).collect
res5: Array[Int] = Array(1, 1, 9, 18, 18, 24, 26, 29, 32, 34, 37, 38, 42, 43, 45, 51, 54, 56, 60, 65, 67, 70,     73, 74, 74, 75, 85, 86, 95, 99)  

上述例子中第一个参数withReplacement为true时使用放回抽样(泊松抽样[1]),为false时使用不放回抽样(伯努利抽样),第二个参数fraction是百分比,第三个参数seed是种子,也就是随机取值的起源数字。从例子中还看出当选择放回抽样时,取出的元素中会出现重复值。

3.5.2 键值对型Transformation算子

RDD的操作算子除了单值型还有键值对(Key-Value)型。这里开始介绍键值对型的算子,主要包括groupByKey、combineByKey、reduceByKey、sortByKey、cogroup和join,如表3-5所示。

表3-5 键值对型Transformation算子

方法名

方法定义

groupByKey

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

combineByKey

def combineByKey[C](createCombiner: V => C, mergeValue: (C,V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)]

reduceByKey

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

sortByKey

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P]

cogroup

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD

join

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

 

1.groupByKey

类似groupBy方法,作用是把每一个相同Key值的的Value聚集起来形成一个序列,可以使用默认分区器和自定义分区器,但是这个方法开销比较大,如果想对同一Key进行Value的聚合或求平均,则推荐使用aggregateByKey或者reduceByKey。

方法源码实现:

def groupByKey(numPartitions:Int): RDD[(K, Iterable[V])] = {

    groupByKey(new HashPartitioner(numPartitions))

}

def groupByKey(partitioner:Partitioner): RDD[(K, Iterable[V])] = {

//groupByKey不应该使用map端的combine操作,因为map端并不会减少shuffle的数据,还要求

//所有map端的数据都插入hash表中,导致很多对象进入内存中的老年代。

    val createCombiner = (v: V) =>CompactBuffer(v)

    val mergeValue = (buf: CompactBuffer[V], v:V) => buf += v

    val mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2

    val bufs = combineByKey[CompactBuffer[V]](

      createCombiner, mergeValue,mergeCombiners, partitioner, mapSideCombine=false)

    bufs.asInstanceOf[RDD[(K, Iterable[V])]]

}

【例3-19】groupByKey方法应用样例

val a = sc.parallelize(List("mk", "zq", "xwc", "fjg", "dcp", "snn"), 2)
val b = a.keyBy(x => x.length)       // keyBy方法调用map(x => (f(x),x))生成键值对
b.groupByKey.collect
res6: Array[(Int, Iterable[String])] = Array((2,CompactBuffer(mk, zq)), (3,CompactBuffer(xwc, fjg, dcp, snn)))  

这个例子先创建包含List集合对象的RDD,然后使用keyBy方法生成Key-Value键值对,然后调用groupByKey方法将相同Key的Value聚合,最后调用collect方法以数组形式输出。

【核心API开发】Spark入门教程[3]

图3-7  groupByKey方法应用样例

2.combineByKey

combineByKey方法能高效的将键值对形式的RDD按相同的Key把Value合并成序列形式,用户能自定义RDD的分区器和是否在map端进行聚合操作。

方法源码实现:

defcombineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {

  combineByKey(createCombiner, mergeValue,mergeCombiners, defaultPartitioner(self))

}

defcombineByKey[C](createCombiner: V => C,

mergeValue: (C, V) => C,        //输入2个不同类型参数,返回其中一个类型参数

   mergeCombiners: (C, C) => C,  //输入2个同类型参数,返回一个参数

   numPartitions: Int): RDD[(K, C)] = {

     combineByKey(createCombiner,mergeValue, mergeCombiners, new HashPartitioner(numPartitions))

}

def combineByKey[C](createCombiner:V => C,

   mergeValue: (C, V) => C,

   mergeCombiners: (C, C) => C,

   partitioner: Partitioner,

   mapSideCombine: Boolean = true,

   serializer: Serializer = null): RDD[(K, C)]= {

     require(mergeCombiners!= null, "mergeCombiners must be defined") // required as of Spark0.9.0

     if(keyClass.isArray) {

       if(mapSideCombine) {

          throw new SparkException("Cannot usemap-side combining with array keys.")

      }

      if(partitioner.isInstanceOf[HashPartitioner]) {

        throw new SparkException("Defaultpartitioner cannot partition array keys.")

      }

    }

    val aggregator = new Aggregator[K, V, C](

      self.context.clean(createCombiner),

      self.context.clean(mergeValue),

      self.context.clean(mergeCombiners))

    if (self.partitioner == Some(partitioner)){

      self.mapPartitions(iter => {

        val context = TaskContext.get()

        new InterruptibleIterator(context,aggregator.combineValuesByKey(iter, context))

      }, preservesPartitioning = true)

    } else {

      new ShuffledRDD[K, V, C](self,partitioner)

        .setSerializer(serializer)

        .setAggregator(aggregator)

        .setMapSideCombine(mapSideCombine)

    }

  }

【例3-20】combineByKey方法应用样例

val a = sc.parallelize(List("xwc", "fjg", "wc", "dcp", "zq", "snn", "mk", "zl", "hk", "lp"), 2)
val b = sc.parallelize(List(1,2,2,3,2,1,2,2,2,3), 2)
val c = b.zip(a)    //把a和b中对应元素组合成键值对,如Array((1,xwc), (3,fjg), (2,wc), (3,dcp)...
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String] = x ::: y)
d.collect
res13: Array[(Int, List[String])] = Array((2,List(zq, wc, fjg, hk, zl, mk)), (1,List(xwc, snn)), (3,List(dcp, lp)))  

在使用zip方法得到键值对序列c后调用combineByKey,把相同Key的value进行合并到List中。这个例子中使用三个参数的重载方法,该方法第一个参数是createCombiner,作用是把元素V转换到另一类型元素C,该例子中使用的是List(_),表示将输入的元素放在list集合中;mergeValue的含义是把元素V合并到元素C中,在该例子中使用的是函数是x:List[String],y:String) => y :: x,表示将y字符串合并到x链表集合中;mergeCombiners含义是将两个C元素合并,在该例子中使用的是x:List[String], y:List[String]= x ::: y,表示把x链表集合中的内容合并到y链表集合中。

3.reduceByKey

使用一个reduce函数来实现对相同Key的Value的聚集操作,在发送结果给reduce前会在map端的执行本地merge操作。该方法的底层实现就是调用combineByKey方法的一个重载方法。

方法源码实现:

def reduceByKey(partitioner:Partitioner, func: (V, V) => V): RDD[(K, V)] = {

    combineByKey[V]((v: V) => v, func, func,partitioner)

}

def reduceByKey(func: (V, V)=> V, numPartitions: Int): RDD[(K, V)] = {

    reduceByKey(newHashPartitioner(numPartitions), func)

}

def reduceByKey(func: (V, V)=> V): RDD[(K, V)] = {

    reduceByKey(defaultPartitioner(self), func)

}

【例3-21】reduceByKey方法应用样例

(1)val a = sc.parallelize(List("dcp", "fjg", "snn", "wc", "zq"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey((a,b) => a + b).collect
res22: Array[(Int, String)] = Array((2,wczq), (3,dcpfjgsnn))
(2)val a = sc.parallelize(List(3,12,124,32,5 ), 2)
val b = a.map(x => (x.toString.length, x))
b.reduceByKey(_ + _).collect
res24: Array[(Int, Int)] = Array((2,44), (1,8), (3,124))  

这个例子先用map方法映射出键值对,然后调用reduceByKey方法对相同Key的Value值进行累加。例子中第一个是使用字符串,故使用聚合相加后是字符串的合并;第二个例子使用的是数字,结果是对应Key的Value数字相加。

4.sortByKey

这个函数会根据Key值对键值对进行排序,如果Key是字母,则按字典顺序排序,如果Key是数字,则从小到大排序(或从大到小),该方法的第一个参数控制是否为升序排序,当为true时是升序,反之为降序。

方法源码实现:

def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.size) : RDD[(K, V)] =

 {

    val part = new RangePartitioner(numPartitions,self, ascending)

    new ShuffledRDD[K, V, V](self, part)

      .setKeyOrdering(if (ascending) orderingelse ordering.reverse)

 }

【例3-22】sortByKey方法应用样例

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)       //a.count得到单词的字母个数
val c = a.zip(b)
c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
c.sortByKey(false).collect
res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))  

这个例子先通过zip方法得到包含键值对的变量c,然后演示了sortByKey方法中参数为true和false时的计算结果。本例中的key是字符串,故可以看出当Key为true时,结果是按Key的字典顺序升序输出,反之则为降序输出结果;当key为数字的时候,则按大小排列。

5.cogroup

cogroup是一个比较高效的函数,能根据Key值聚集最多3个键值对的RDD,把相同Key值对应的Value聚集起来。

方法源码实现:

//参数为一个RDD情况

def cogroup[W](other: RDD[(K,W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] = {

    if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {

      throw new SparkException("Defaultpartitioner cannot partition array keys.")

    }

    val cg = new CoGroupedRDD[K](Seq(self,other), partitioner)

    cg.mapValues { case Array(vs, w1s) =>

      (vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W]])

    }

}

//参数为两个RDD情况

def cogroup[W1, W2](other1:RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K,(Iterable[V], Iterable[W1], Iterable[W2]))] = {

    if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {

      throw new SparkException("Defaultpartitioner cannot partition array keys.")

    }

    val cg = new CoGroupedRDD[K](Seq(self,other1, other2), partitioner)

    cg.mapValues { case Array(vs, w1s, w2s)=>(vs.asInstanceOf[Iterable[V]],

        w1s.asInstanceOf[Iterable[W1]],

        w2s.asInstanceOf[Iterable[W2]])

    }

  }

//参数为3个RDD情况

def cogroup[W1, W2, W3](other1:RDD[(K, W1)],

      other2: RDD[(K, W2)],

      other3: RDD[(K, W3)],

      partitioner: Partitioner)

      : RDD[(K, (Iterable[V], Iterable[W1],Iterable[W2], Iterable[W3]))] = {

    if (partitioner.isInstanceOf[HashPartitioner]&& keyClass.isArray) {

      throw new SparkException("Defaultpartitioner cannot partition array keys.")

    }

    val cg = new CoGroupedRDD[K](Seq(self,other1, other2, other3), partitioner)

    cg.mapValues { case Array(vs, w1s, w2s, w3s)=>

       (vs.asInstanceOf[Iterable[V]],

         w1s.asInstanceOf[Iterable[W1]],

         w2s.asInstanceOf[Iterable[W2]],

         w3s.asInstanceOf[Iterable[W3]])

    }

  }

【例3-23】cogroup方法应用样例

(1)val a =sc.parallelize(List(1,2,2 ,3, 1, 3), 1)
val b =a.map(x => (x, "b"))
val c =a.map(y => (y, "c"))
b.cogroup(c).collect
res25:Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b,b),
CompactBuffer(c,c))), (3,(CompactBuffer(b, b),CompactBuffer(c, c))), (2,(CompactBuffer(b,                             b),CompactBuffer(c, c))))
(2)val d = a.map(m => (m,"x"))
b.cogroup(c,d).collect
res26:Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =Array((1,(CompactBuffer(b,b)
,CompactBuffer(c,c),CompactBuffer(x, x))), (3,(CompactBuffer(b, b),CompactBuffer(c, c),
CompactBuffer(x,x))), (2,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(x, x))))  

例子中有两个个小例子,依次是单个参数和两个参数的情况,使用cogroup方法对单个RDD和2个RDD进行聚集操作。

6.join

对键值对的RDD进行cogroup操作,然后对每个新的RDD下Key的值进行笛卡尔积操作,再对返回结果使用flatMapValues方法,最后返回结果。

方法源码实现:

def join[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, W))] = {

    this.cogroup(other,partitioner).flatMapValues( pair =>

      for (v <- pair._1.iterator; w <-pair._2.iterator) yield (v, w)

    )

}

【例3-24】join方法应用样例

val a = sc.parallelize(List("fjg", "wc", "xwc","dcp"), 2)
val b = a.keyBy(_.length)       //得到诸如(3,"fjg"),(2,"wc")的键值对序列
val c = sc.parallelize(List("fjg", "wc", "snn", "zq", "xwc","dcp"), 2)
val d = c.keyBy(_.length)
b.join(d).collect
res29: Array[(Int, (String, String))] = Array((2,(wc,wc)), (2,(wc,zq)), (3,(fjg,fjg)), (3,(fjg,snn)), (3,(fjg,xwc)), (3,(fjg,dcp)), (3,(xwc,fjg)), (3,(xwc,snn)), (3,(xwc,xwc)), (3,(xwc,dcp)), (3,(dcp,fjg)), (3,(dcp,snn)), (3,(dcp,xwc)), (3,(dcp,dcp)))  

这个例子先构造两个包含键值对元素的变量b和d,然后调用join方法,得到join后的结果,根据源码实现,join方法本质是cogroup方法和flatMapValues方法的组合,其中cogroup方法得到聚合值,flatMapValues方法实现的是笛卡尔积,笛卡尔积的过程是在各个分区内进行,如例子中的Key等于2分区,wc与(wc,zq)求笛卡尔积,得到(2,(wc,wc))和(2,(wc,zq))的结果。

【核心API开发】Spark入门教程[3]

图3-8  join方法应用样例

3.5.3 Action算子

当Spark的计算模型中出现Action算子时才会执行提交作业的runJob动作,这时会触发后续的DAGScheduler和TaskScheduler工作。这里主要讲解常用的Action算子,有collect、reduce、take、top、count、takeSample、saveAsTextFile、countByKey、aggregate,具体方法和定义如表3-6所示。

表3-6  Action算子

方法名

方法定义

collect

def collect(): Array[T]

reduce

def reduce(f: (T, T) => T): T

take

def take(num: Int): Array[T]

top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

count

def count(): Long

takeSample

def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T]

saveAsTextFile

def saveAsTextFile(path: String)

countByKey

def countByKey(): Map[K, Long]

aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

1.collect

collect方法的作用是把RDD中的元素以数组的方式返回。

方法源码实现:

def collect(): Array[T] = {

    val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)

    Array.concat(results: _*)

}

【例3-25】collect方法应用样例

val c = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 2)
c.collect
res29: Array[String] = Array(a, b, c, d, e, f)  

这个例子直接把RDD中的元素转换成数组返回。

2.reduce

reduce方法使用一个带两个参数的函数把元素进行聚集,返回一个元素结果,注意该函数中的二元操作应该满足交换律和结合律,这样才能在并行系统中正确计算。

方法源码实现:

def reduce(f: (T, T) => T): T = {            //输入是两个参数的函数,返回一个值

    val cleanF = sc.clean(f)

    val reducePartition: Iterator[T] =>Option[T] = iter => {

      if (iter.hasNext) {

        Some(iter.reduceLeft(cleanF))

      } else {

        None

      }

    }

    var jobResult: Option[T] = None

    val mergeResult = (index: Int, taskResult:Option[T]) => {

      if (taskResult.isDefined) {

        jobResult = jobResult match {

          case Some(value) => Some(f(value,taskResult.get))

          case None => taskResult

        }

      }

    }

    sc.runJob(this, reducePartition, mergeResult)

//获得Option的最后结果,或者当RDD为空时抛出异常

    jobResult.getOrElse(throw newUnsupportedOperationException("empty collection"))

  }

【例3-26】reduce方法应用样例

val a = sc.parallelize(1 to 10)
a.reduce((a,b)=> a + b)
res41: Int = 55  

这个例子使用简单的函数将输入的元素相加,过程是先输入前两个元素相加,然后将得到的结果与下一个输入元素相加,依次规则计算出所有元素的和。

3.take

take方法会从RDD中取出前n[1]个元素。方法是先扫描一个分区并后从分区中得到结果,然后评估得到的结果是否达到取出元素个数,如果没达到则继续从其他分区中扫描获取。

方法源码实现:

def take(num: Int): Array[T] = {

    if (num == 0) {

      return new Array[T](0)

    }

    val buf = new ArrayBuffer[T]

    val totalParts = this.partitions.length

    var partsScanned = 0

    while (buf.size < num &&partsScanned < totalParts) {

// numPartsToTry表示在这个迭代中尝试的分区数,这个数可以比总分区数大,因为在runJob中的总分区会限定它的值。

      var numPartsToTry = 1

      if (partsScanned > 0) {

//如果没有在之前的迭代中找到记录,则会重复寻找(次数翻四倍),此外还会调整分

 区数,最多调整涨幅不超过50%

        if (buf.size == 0) {

          numPartsToTry = partsScanned * 4

        } else {

          // the left side of max is >=1whenever partsScanned >= 2

          numPartsToTry = Math.max((1.5 * num *partsScanned / buf.size).toInt - partsScanned, 1)

          numPartsToTry =Math.min(numPartsToTry, partsScanned * 4)

        }

      }

      val left = num - buf.size

      val p = partsScanned untilmath.min(partsScanned + numPartsToTry, totalParts)

      val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal = true)

      res.foreach(buf ++= _.take(num -buf.size))

      partsScanned += numPartsToTry

    }

    buf.toArray

  }

【例3-27】take方法应用样例

(1) val b = sc.parallelize(List("a", "b", "c", "d", "e"), 2)
   b.take(2)
   res18: Array[String] = Array(a, b)
(2) val b = sc.parallelize(1 to 100, 5)
   b.take(30)
   res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,22, 23, 24, 25, 26, 27, 28, 29, 30)  

这里例子分别演示了字母和数字情况,其实工作原理都相同,即从分区中按先后顺序拿元素出来。

4.top

top方法会利用隐式排序转换方法(见实现源码中implicit修饰的方法)来获取最大的前n个元素。

方法源码实现:

def top(num: Int)(implicit ord:Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

def takeOrdered(num:Int)(implicit ord: Ordering[T]): Array[T] = {

    if (num == 0) {

      Array.empty

    } else {

      val mapRDDs = mapPartitions { items =>

        // Priority keeps the largest elements,so let's reverse the ordering.

        val queue = newBoundedPriorityQueue[T](num)(ord.reverse)

        queue ++=util.collection.Utils.takeOrdered(items, num)(ord)

        Iterator.single(queue)

      }

      if (mapRDDs.partitions.size == 0) {

        Array.empty

      } else {

        mapRDDs.reduce { (queue1, queue2) =>

          queue1 ++= queue2

          queue1

        }.toArray.sorted(ord)      

      }

    }

  }

【例3-28】top方法应用样例

val c = sc.parallelize(Array(1, 3, 2,4, 9, 2,11,5), 3)
c.top(3)
res10:Array[Int] = Array(11, 9, 5)  

例子显示了top的使用方法,很简洁,直接输入元素个数作为参数就能得到前n个元素的值。

5.count

count方法计算并返回RDD中元素的个数。

方法源码实现:

def count(): Long =sc.runJob(this, Utils.getIteratorSize _).sum

def runJob[T, U: ClassTag](rdd:RDD[T], func: Iterator[T] => U): Array[U] = {

    runJob(rdd, func, 0 until rdd.partitions.size,false)

}

【例3-29】count方法应用样例

val c = sc.parallelize(Array(1,3, 2,4, 9, 2,11,5), 2)
c.count
res3: Long = 8  

6.takeSample

takeSample方法返回一个固定大小的数组形式的采样子集,此外还把返回的元素顺序随机打乱,方法的三个参数含义依次是否放回数据、返回取样的大小和随机数生成器的种子。

方法源码实现:

def takeSample(withReplacement:Boolean,

     num: Int,

      seed: Long = Utils.random.nextLong):Array[T] = {

    val numStDev =  10.0

    if (num < 0) {

      throw newIllegalArgumentException("Negative number of elements requested")

    } else if (num == 0) {

      return new Array[T](0)

    }

    val initialCount = this.count()

    if (initialCount == 0) {

      return new Array[T](0)

    }

    val maxSampleSize = Int.MaxValue -(numStDev * math.sqrt(Int.MaxValue)).toInt

    if (num > maxSampleSize) {

      throw new IllegalArgumentException("Cannotsupport a sample size > Int.MaxValue - " +

        s"$numStDev *math.sqrt(Int.MaxValue)")

    }

    val rand = new Random(seed)

    if (!withReplacement && num >=initialCount) {

      returnUtils.randomizeInPlace(this.collect(), rand)

    }

    valfraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,

      withReplacement)

    var samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()

//如果采样容量不够大,则继续采样

    var numIters = 0

    while (samples.length < num) {

      logWarning(s"Needed to re-sample dueto insufficient sample size. Repeat #$numIters")

      samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()

      numIters += 1

    }

    Utils.randomizeInPlace(samples,rand).take(num)

  }

【例3-30】takeSample方法应用样例

val x = sc.parallelize(1 to 100, 2)
x.takeSample(true, 30, 1)
res13: Array[Int] = Array(72, 37, 96, 47, 40, 96, 57, 100, 8, 44, 82, 11, 32, 47, 99, 94, 37, 97,52, 41, 100, 78, 93, 11, 6, 100, 75, 14, 47, 16)  

这个例子直接使用takeSample方法,得到30个固定数字的样本,采取有放回抽样的方式。

7.saveAsTextFile

把RDD存储为文本文件,一次存一行。

方法源码实现:

def saveAsTextFile(path:String) {          

    val nullWritableClassTag =implicitly[ClassTag[NullWritable]]

    val textClassTag =implicitly[ClassTag[Text]]

    val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))

   rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

     .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

}

def saveAsTextFile(path:String, codec: Class[_ <: CompressionCodec]) {     //参数可选择压缩方式

    // 参考https://issues.apache.org/jira/browse/SPARK-2075

    val nullWritableClassTag =implicitly[ClassTag[NullWritable]]

    val textClassTag =implicitly[ClassTag[Text]]

    val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))

   rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

     .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)

}

【例3-31】saveAsTextFile方法应用样例

val a = sc.parallelize(1 to 100, 3)
a.saveAsTextFile("BIT_Spark")
//控制台打印出的部分日志
15/08/04 10:27:58 INFO FileOutputCommitter: Saved output of task 'attempt_201508041027_0001       _m_000002_5' to file:/home/hadoop/spark/bin/BIT_Spark
//在当前路径下可以看到输出3个文件part-***,原因是RDD有3个分区,每个分区默认输出一个文件,SUCCESS文件执行表示成功。
hadoop@master:~/spark/bin/BIT_Spark$ ls
part-00000  part-00001  part-00002  _SUCCESS
hadoop@master:~/spark/bin/BIT_Spark$ vim part-00000     //查看第一个分区文件的内容
1
2
3
4
5  

8.countByKey

类似count方法,不同的是countByKey方法会根据相同的Key计算其对应的Value个数,返回的是map类型的结果。

方法源码实现:

def countByKey(): Map[K, Long]= self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap

【例3-32】countByKey方法应用样例

val a = sc.parallelize(List((1, "bit"), (2, "xwc"), (2, "fjg"), (3, "wc"),(3, "wc"),(3, "wc")), 2)
a.countByKey
res3: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 2,3 -> 3)  

这个例子先构造键值对变量a,然后使用countByKey方法对相同Key的Value进行统计,过程是先调用mapValue方法把Value映射为1,再reduceByKey到Key和其对应Value的个数。

9.aggregate

aggregate方法先将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

aggregate有两个函数seqOp和combOp,这两个函数都是输入两个参数,输出一个参数,其中seqOp函数可以看成是reduce操作,combOp函数可以看成是第二个reduce操作(一般用于combine各分区结果到一个总体结果),由定义,combOp操作的输入和输出类型必须一致。

方法源码实现:

def aggregate[U:ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {

//克隆zero值,因为这个值也会被序列化

    var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())        

    val cleanSeqOp = sc.clean(seqOp)

    val cleanCombOp = sc.clean(combOp)

    val aggregatePartition = (it: Iterator[T])=> it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

    val mergeResult = (index: Int, taskResult:U) => jobResult = combOp(jobResult, taskResult)

    sc.runJob(this, aggregatePartition,mergeResult)

    jobResult

}

【例3-33】aggregate方法应用样例

/ 分区0的reduce操作是max(0, 2,3) = 3
// 分区1的reduce操作是max(0, 4,5) = 5
// 分区2的reduce操作是max(0, 6,7) = 7
// 最后的combine操作是0 + 3 + 5 + 7 = 15
//注意最后的reduce操作包含初始值
(1)val z = sc.parallelize(List(2,3,4,5,6,7), 3)
 z.aggregate(0)((a,b) => math.max(a, b), (c,d) => c + d )
 res6: Int = 15
// 分区0的reduce操作是max(3, 2,3) = 3
// 分区1的reduce操作是max(3, 4,5) = 5
// 分区2的reduce操作是max(3, 6,7) = 7
// 最后的combine操作是3 + 3 + 5 + 7 = 18  
(2)val z = sc.parallelize(List(2,3,4,5,6,7), 3)
 z.aggregate(3)((a,b) => math.max(a, b), (c,d) => c + d )
 res7: Int = 18  
(3)val z = sc.parallelize(List("a","b","c","d","e","f"),2)
 z.aggregate("")(_ + _, _+_)
 res8: String = defabc  
(4)val z = sc.parallelize(List("a","b","c","d","e","f"),2)
 z.aggregate("x")(_ + _, _+_)
 res9: String = xxdefxabc  

在spark中一个分区对应一个task,从源码来看,zeroValue参与每个分区的seqOp(reduce)方法和最后的combOp(第二个reduce)方法,先对每个分区求reduce,在该例子中是对3个分区分别求Max操作,得到分区最大值,得到的结果参与combOp方法,即把各分区的结果和zeroValue相加最后得到结果值,从前两个例子可以看出这个操作特点,体现先分后总的思想。

对于后面两个例子使用的是字符串,aggregate方法的思路一样,先对各分区求seqOp方法然后再使用combOp方法把各分区的结果聚合相加,得到最终结果。

10.fold

fold方法与aggregate方法原理类似,区别就是少了一个seqOp方法。fold方法是把每个分区的元素进行聚合,然后调用reduce(op)方法处理。

方法源码实现:

def fold(zeroValue: T)(op: (T,T) => T): T = {

    //克隆zero值,因为这个值也会被序列化

    var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())

    val cleanOp = sc.clean(op)

    val foldPartition = (iter: Iterator[T])=> iter.fold(zeroValue)(cleanOp)

    val mergeResult = (index: Int, taskResult:T) => jobResult = op(jobResult, taskResult)

    sc.runJob(this, foldPartition, mergeResult)

    jobResult

}

【例3-34】fold方法应用样例

// 分区0的reduce操作是0 + 1 + 2 + 3 = 6
// 分区1的reduce操作是0 + 4 + 5 + 6 = 15
// 分区2的reduce操作是0 + 7 + 8 + 9 = 24
// 最后的combine操作是0 + 6 + 15 + 24 = 45
(1)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
     a.fold(0)(_ + _)
    res11:: Int = 45
// 分区0的reduce操作是1 + 1 + 2 + 3 = 7
// 分区1的reduce操作是1 + 4 + 5 + 6 = 16
// 分区2的reduce操作是1 + 7 + 8 + 9 = 25
// 最后的combine操作是1 + 7 + 16 + 25 = 53
(2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
     a.fold(1)(_ + _)
     res12: Int = 53  

这个例子中的使用方式与aggregate方法非常相似,注意zeroValue参与所有分区计算。fold计算是保证每个分区能独立计算,它与aggregate最大的区别是aggregate对不同分区提交的最终结果定义了一个专门的comOp函数来处理,而fold方法是采用一个方法来处理aggregate的两个方法过程。

3.6 本章小结

本章主要为读者讲述了Spark核心开发部分,其中讲述了SparkContext的作用与创建过程,还对RDD的概念模型进行介绍,说明了RDD的Transformation和Action操作的内涵意义。在基本介绍Spark编程模型后在实践环节列出了主要的Transformation和Action方法的使用范例,同时结合了方法源码说明范例计算过程。本章为Spark应用基础,第六章将继续集合源码深入介绍RDD的运行机制和Spark调度机制。下一章将逐一介绍Spark的四大编程模型,让读者进一步学习并掌握Spark在不同业务场景下的应用。

上一篇:zencart 新页面调用好功能代码集:


下一篇:使用Properties去读取配置文件,并获得具体内容值