Spark初窥

标签(空格分隔): Spark


[toc]

intro

dataset和operation

Spark对数据集合的基本抽象叫做Dataset。Dataset可以从文件直接创建,也可以从其他dataset经过transform变换而来。具体变换操作比如:

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

这个transform会将数据映射为数字并计算最大值。这里有map操作,有reduce操作,每个操作后都会转换为一个新的Dataset。而这就是Spark支持的MapReduce模型的data flow。

cache

Spark也支持把数据集拉倒cluster-wide下的内存cache中进行缓存。这对于数据重复读取非常有帮助,当迭代过程中有热点数据时可以进行数据集缓存。

运行程序的简单例子

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

RDD

Spark程序包含了一个驱动程序来运行用户的main函数,并在集群中执行各种并行操作。Spark的主要抽象概念叫做RDD(resilient distributed dataset),就是弹性分布式数据集——一组分布在集群中不同节点上的可以被并行操作的元素的集合。RDD可以通过一个HDFS的文件系统创建文件得到,也可以通过在驱动程序里利用已有的Scala集合并转换得到。一般用户会请求Spark将RDD在存储在内存中,从而可以在并行操作中重复利用。当然,RDD提供当执行节点出错后的自动回复能力。

再一个对Spark的抽象就是并行操作*享变量。默认情况下,当Spark在不同节点上以一组task并行运行一个函数时,它会将这个函数用到的变量传递给每个task。有时候一个变量需要被跨任务共享、在任务和驱动程序间共享。Spark支持两种共享变量类型:广播变量,在所有节点的内存中缓存一个值;accumulators累加器,意味着这些变量是可加的,比如counter和sum等。

准备

Spark程序准备运行需要创建基础的SparkContext对象,一个JVM里只允许有一个active的SparkContext对象,因此如果已经存在,需要stop()掉。准备工作的代码如下:

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

其中appName是用来UI展示的,而master是一个cluster的url。

并行集合

并行集合是通过在驱动程序中调用SparkContext的parallelize方法来构建的。比如如下代码:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

会构造一个并行集合1-5.

并行集合一个最重要的参数就是将dataset切成几个partition。Spark在每个partition上会启动一个task运行。比较典型的情况会为集群的每个CPU选择2-4个分区数量。一般Spark会根据集群的情况自动设计分区数量。当然也可以手动set这个参数,比如sc.parallelize(data, 10)

外部dataset

Spark支持从任意Hadoop支持的文件系统加载外部Dataset,包括本地文件系统、HDFS、Cassandra、Hbase、Amazon S3等。Spark支持文本文件、SequenceFile和任意Hadoop InputFormat。
比如如下代码:

val distFile = sc.textFile("data.txt")

需要注意的几点:

  • 如果加载本地文件系统,文件必须可以被所有的worker节点加载,因此需要网络共享副本到每个worker。
  • 所有的Spark的基于文件的input方法,都支持在目录、压缩文件甚至wildcard的方式,比如,可以使用textFile("/my/directory"),也可以textFile("/my/directory/*.txt"),还可以textFile("/my/directory/*.gz")
  • textFile方法也可以支持第二个参数来控制文件的分区数。默认Spark为HDFS文件的每个block构建一个分区(一个block默认是128MB大小),但也可以修改这个大小。注意不可以少于block个数个partition。

Spark的scala api也支持其他数据格式:

  • SparkContext.wholeTextFiles可以读入一个包含很多小文本文件的目录,返回一组(文件名,内容)的pair。而textFile返回的是每个文件的每一行记录。而分区取决于数据的位置,某些情况下回产出非常多的分区。所以wholeTextFiles方法也提供一个可选的第二位参数来控制最小分区数。
  • 对于SequenceFile,使用SparkContext’s sequenceFile[K, V]方法,其中K和V是文件中key和value的类型。可以是Hadoop里Writable接口的子类,比如IntWritableText。另外Spark也允许声明native类型;比如sequenceFile[Int, String]会自动读IntWritables和Texts。
  • 对于其他的Hadoop InputFormats,你可以使用SparkContext.hadoopRDD方法,它可以以任意的JobConf输入format类、key类和value类。以在Hadoop job里同样的方式设置即可。也可以使用SparkContext.newAPIHadoopRDD方法来处理基于MapReduce API的输入格式。
  • RDD.saveAsObjectFileSparkContext.objectFile支持保存一个RDD,方式和序列化一个Java对象一样。当然这种序列化方式不如Avro这样的更高效,但是简单。

RDD操作

RDD支持两种操作类型:transformation变换,即将一个已有的Dataset变换为一个新的dataset,也包含通过对一个dataset进行一系列计算返回一个值给驱动程序。比如,map就是一个将一个dataset的元素通过一个函数变换为一个新的RDD的一种变换操作。另一操作就是reduce,它是将一个RDD的元素通过一个函数聚合成一个值给驱动程序。

Spark里所有的变换都是lazy的,也就是说他们不是马上计算出结果。它只是会记住对基础dataset进行的所有变换操作。变换操作只有在一个动作需要结果返回给驱动程序时才会计算。这种设计使Spark运行的更加高效。比如,我们可以实现一个通过map创建dataset再用于reduce从而返回给driver,而不是一个大的dataset。

默认情况下,每个变换过的RDD可以在每次运行一个动作时被重新计算。然而也可以使用persist或者cache方法来持久化到内存,这样Spark可以将结果缓存到cluster以便下次查询时获得更快的速度。Spark也支持将结果RDD持久化到磁盘,或者复制到多个节点。

基础

看一个代码片段就明白

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

首先定义了一个外部文件为一个RDD,这时lines不是内存中的数据,只是一个指针指向文件。然后经过一个map操作变换为一个新RDD——lineLengths。同理lineLengths也不是马上计算的,也是lazy的。直到最后reduce触发动作,Spark开始将计算分为多个task来运行在独立的机器上,每个机器单独计算分配的map部分数据和本地reduce,最终将结果返回driver程序。
其中lineLengths要想复用,可以lineLengths.persist()来进行内存持久化,这个操作是在reduce前就会触发执行的。

给Spark传递函数

Spark的API严重依赖通过驱动程序将函数传递下去以便运行在集群中。有两种推荐的做法:

  • 匿名函数语法,可以作为代码片段。
  • 全局单例对象的static方法。比如可以定义一个object叫MyFunctions,然后传递MyFunctions.func1,比如:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

也可以在类内部传递函数引用,比如:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

这样如果有新的类实例产生,就要传递全部的object到集群中,类似的做法如下:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

最简单的方法是把field拷贝到一个本地变量,而不是做对象传递,如下:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解闭包

关于Spark最难理解的事情之一就是理解在集群中跨机器执行一段代码时其中变量和方法的scope和生命周期。RDD操作从其scope外部修改变量就是一个让人困惑的例子。下面举个例子:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

这里RDD做了一个sum操作。这个代码会根据是否在同一个JVM中运行而导致行为不同。比如在local模式和cluster模式下,表现就不一样。
这段代码的行为是未定义的,可能会根据实际情况表现出不同的行为。为了执行这个作业,Spark会将RDD处理操作划分为若干个task,每个task被一个executor执行。执行前,Spark先计算task的闭包,闭包就是那些在操作计算RDD时对executor可见的变量和方法。这个闭包会被序列化并发送给每个执行器。
闭包里的变量是多份拷贝,此例中的counter就是一个在foreach中的引用,而不是driver节点的counter。driver节点内存中是会有个counter,但不再是被执行器可见的counter。执行器只会看到传递给它的序列化的闭包中的counter的拷贝副本。这样,这段代码执行后counter还是0,因为每个执行器执行的counter都是自己闭包中的变量副本。
在local模式下,在某种环境下,foreach函数会确实的在driver所在的同一个JVM中执行,引用的就将是同一个counter,那么这时结果是正确的。
为了确保这种场景下的代码行为是well-defined,Spark提供了Accumulator。累加器在Spark中可以用来提供一种安全更新变量机制,当执行计算被划分到集群中的多个工作节点时就会有用。
通常,闭包像一个本地方法或者循环体,不应该被用来改变一些全局状态值。Spark没有明确定义或者说能保证在闭包外引入对象的可变性。有时候local模式下使用没问题,二同样的代码在分布式模式下可能就出问题。尽量使用Accumulator来做全局聚合。
还有一个值得提一下的就是Spark的打印方法,默认的print会发生在executor端而不是在driver程序中,这导致无法看到过程。通过collect方法可以将RDD汇总到driver所在的机器上,通过rdd.collect().foreach(println)方法。但是这会导致driver迅速耗尽内存,因为所有的rdd都会汇总过来。正确的姿势应该是取样输出,使用take方法,通过rdd.take(100).foreach(println)来抽样部分数据。

KV对怎么操作

尽管大多数的Spark RDD操作可以支持任意类型的对象,但是一种特殊操作只对key-value对生效。最典型的就是分布式的"shuffle"操作,比如根据一个key来grouping或者aggreating。
在Scala里,这些操作由Tuple2对象来提供(默认内置的tuple,一般是(a,b)这样的形式)。KV对操作由PairRDDFunctions类来提供,它会自动包装一组RDD tuple。比如下面的代码:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

这个代码片段会记录每行记录和出现次数为一个kv对,然后根据行作为key来reduce计算不同行的出现次数。后续可以再通过类似counts.sortByKey()来按行字典序排序这些kv对,再通过counts.collect()来汇总结果到driver程序。

Transformation

Spark支持的Transformation有如下的列表:

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
reduceByKey(func, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Action

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Spark RDD API也暴露了一些动作方法的异步化版本,比如foreachAsync就是针对foreach的异步版,它可以快速返回一个FutureAction给调用者而不是阻塞在计算中。

Shuffle操作

Spark中一些特定操作会触发一个shuffle事件。Spark利用shuffle机制来进行数据的重新分布,从而使跨分区数据重新来分组组织。一般在跨executors和机器中拷贝数据使得shuffle操作是一个复杂又耗费的操作。

首先了解一下为什么要shuffle,我们可以通过一个reduceByKey操作来看下。该操作会产生一个新RDD,其中所有具有相同key的值都会被合并在一个tuple里。显然很有挑战的一点就是不是所有数据会布置在一个分区中或者一台机器上,但是reduce计算确需要所有的数据一起参与。这就带来跨机器跨分区问题。

在Spark中,数据一般不会跨分区分布。在计算过程中,一个task运作在一个分区上,因此为了组织reduceByKey操作执行需要的全部的数据,Spark需要进行一个all-to-all的操作。它必须从所有分区中读出数据并且找到所有key的所有value,然后为每个key把跨分区的值汇聚在一起并计算该key的对应reduce值。整个这个过程就叫做shuffle。
shuffle带来的新分区里的数据和分区的顺序是重要的,但是分区内元素的顺序则不重要。如果需要元素数据也有序,那么可以这么做:

  • mapPartitions来使分区排序,比如.sorted方法
  • repartitionAndSortWithinPartitions来排序分区
  • sortBy来全局排序RDD

会进行shuffle的操作包括:repartitioncoalesce,还有ByKey类型的操作比如groupByKeyreduceByKey,还有join类型的操作比如cogroupjoin

shuffle是一个很昂贵的操作,因为它会带来磁盘I/O、数据序列化和网络I/O。为了组织shuffle过程中的数据,Spark会产生一系列task——map任务来组织数据和一系列reduce任务来聚合它们。这个命名和MapReduce相同但是与Spark内部的map和reduce操作不同。

在计算过程内部,单个map任务的结果存储在内存中,直到超过容量。然后会按照目标分区来排序,并写入单个文件中。在reduce端,任务会读取相关联的排好序的block。

特定的shuffle操作可能会消耗大量的堆内存,因为它们完全在利用内存中的数据结构来组织其记录和进行转换。特殊情况是reduceByKeyaggregateByKey操作会在map端创建这些结构,而其他的ByKey操作都在reduce端产生。当数据超过内存限制时,Spark会将这些表转到磁盘,这会加重磁盘I/O并且带来大量的GC。

shuffle同时也会产出大量的中间磁盘文件。在Spark1.3中,这些文件会一直保留直到RDD不再需要才gc。这样做是为了在重新计算依赖关系时不需要重新创建shuffle文件。如果应用保持这些RDD的引用,那么GC将不会频繁发生,可能会在很长一段时间后进行。这意味着长时间运行的Spark作业可能会占用非常多的磁盘空间。临时存储目录通过spark.local.dir配置项在配置Spark context时候设定。

shuffle行为可以通过很多配置项来调整。具体参考后续关于Spark配置项的文章。

RDD持久化

Spark中最重要的一项能力就是跨操作在内存中持久化dataset。当一个RDD被持久化后,每个节点存储它在内存中计算的任何分区,并在该dataset的其他操作中重用它们。这使得很多后置的操作可以更快。caching是迭代算法和快速交互中的一个特别关键的工具。

可以通过使用persist()或者cache()方法来持久化RDD。在第一次操作时保存记录在节点的内存中。Spark的cache是具备容错性的,如果RDD的任何分区丢失,它都会通过原来创建它的操作重新计算得到。

每个持久化的RDD可以以不同的存储等级来保存,比如,持久数据集到磁盘,以Java序列化方式持久数据集到内存,并复制到多个节点。这些等级通过传递一个StorageLevel对象给persist()方法来实现。cache()方法是使用默认存储等级的一个快捷方式——默认的等级是StorageLevel.MEMORY_ONLY,这种方式会在内存中存储反序列化对象。完整的存储等级如下:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER
(Java and Scala)
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
(Java and Scala)
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

具体使用时如何选择呢?Spark的存储等级是为了提供多种在内存和CPU利用率中的trade-off策略的。官方有如下的推荐:

  • 如果RDD大小适中,那就用默认级别。这是CPU利用率最高的选择。
  • 如果RDD大小不合适,先尝试用MEMORY_ONLY_SER模式,并选择一种快捷的序列化方式,这样可以节省一些空间并且提供高效访问。
  • 别轻易spill到磁盘除非dataset的计算非常昂贵或者dataset包含了非常巨大的数据量。这样重新计算分区可能和从磁盘读取速度一样快。
  • 在期望快速回复数据错误时使用带副本的存储等级。所有的存储等级都提供丢失数据的重新计算这样的容错能力,只不过带副本的模式可以允许在丢失数据时继续进行RDD计算而不是等待对一个丢失分区的重新计算。

Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧数据分区。如果您想手动删除RDD而不是等待它退出缓存,请使用RDD.unpersist()方法。

共享变量

通常,当在远程集群节点上执行传递给Spark操作(例如map或reduce)的函数时,它将在函数中使用的所有变量的单独副本上工作。 这些变量将复制到每台机器,并且远程机器上的变量的更新不会传播回驱动程序。 对跨任务的通用的读写共享变量的支持是效率低下的。 但是,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。

广播变量

广播变量允许程序员在每台机器上保留一个只读变量,而不是随任务一起发送它的副本。 例如,它们可为每个节点以有效的方式提供一个海量输入数据集的副本。 Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。
Spark动作由分布式“shuffle”操作分隔为一组stage执行。 Spark在每个阶段中自动广播任务所需的公共数据。 以这种方式广播的数据以序列化形式缓存并在运行每个任务之前反序列化。 这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。
通过调用SparkContext.broadcast(v)从变量v创建广播变量。 广播变量是v的包装器,可以通过调用value方法访问其值。 下面的代码显示了这个:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

创建广播变量后,应该在群集上运行的任何函数中使用它而不是值v,这样v不会多次传送到节点。 另外,在广播之后不应修改对象v,以便确保所有节点获得广播变量的相同值(例如,如果稍后将变量发送到新节点)。

累加器

累加器是通过关联和交换操作仅仅可以被“增加”的变量,因此可以有效地在并行中支持。 它们可用于实现计数器(如MapReduce)或求和。 Spark本身支持数值类型的累加器,程序员可以添加对新类型的支持。
作为用户,您可以创建命名或未命名的累加器。 如下图所示,命名累加器(在此实例计数器中)将显示在Web UI中,用于修改该累加器的阶段。 Spark在“任务”表中显示任务修改的每个累加器的值。
Spark初窥

可以通过调用SparkContext.longAccumulator()SparkContext.doubleAccumulator()来创建数字累加器,以分别累积Long或Double类型的值。 然后,可以使用add方法将在群集上运行的任务添加到其中。 但是,他们无法读读到它的值。只有driver程序可以使用其value方法读取累加器的值。

下面的代码展示了使用累加器进行累加数组元素并获取值的例子:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

虽然此代码使用了对Long类型累加器的内置支持,但程序员也可以通过继承AccumulatorV2来创建自己的类型。 AccumulatorV2抽象类有几个方法,必须覆盖:reset用于将累加器重置为零,add用于向累加器添加另一个值,merge用于合并另一个相同类型的累加器到这个。 其他必须覆盖的方法包含在API文档中。例如,假设我们有一个表示数学向量的MyVector类,我们可以写:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

请注意,当程序员定义自己的AccumulatorV2类型时,结果类型可能与添加的元素类型不同。
对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次,即重新启动的任务不会更新该值。 在转换中,用户应该知道,如果重新执行任务或作业阶段,则可以多次执行每个任务的更新。
累加器不会改变Spark的延迟评估模型。 如果在RDD上的操作中更新它们,则只有在RDD作为操作的一部分计算时才更新它们的值。 因此,在像map()这样的惰性转换中进行累加器更新时,不能保证执行累加器更新。 以下代码片段演示了此属性:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
上一篇:Impala——2.架构


下一篇:Impala——1.概述