Learning Spark中文版--第六章--Spark高级编程(1)

Introduction(介绍)

本章介绍了之前章节没有涵盖的高级Spark编程特性。我们介绍两种类型的共享变量:用来聚合信息的累加器和能有效分配较大值的广播变量。基于对RDD现有的transformation(转换),我们针对构建成本高的任务引入批量操作,如查询数据库。为了扩展我们可使用工具的范围,我们介绍Spark与外部程序交互的方法,例如用R编写的脚本。

在本章中,我们将以无线电台的通话记录作为输入构造一个示例。这些日志至少包括联系电台的呼号。呼号由国家分配,并且每个国家有自己的呼号范围,所以我们可以查询相关国家。一些呼叫日志也包含物理地址和使用者,可以用来确定距离。Example6-1中有个样例日志条目。本书的样本repo包括一系列呼号,以查找呼叫记录并处理结果。

Example 6-1. Sample call log entry in JSON, with some fields removed

{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}

我们要了解的第一个Spark特性集合是共享变量,这是一种你可以在Spark的task中使用的特殊类型的变量。在例子中,我们使用Spark的共享变量来计算非致命错误条件并分配一个大的查找表。

当我们的task有很长的准备时间,比如创建数据库连接或随机数生成器,在多个数据项之间共享准备工作就很有用。使用远程调用查询数据库,我们将研究如何在每个分区上操作来复用准备工作。

除了Spark直接支持的语言,系统可以调用其他语言写的程序。这一章节会介绍如何使用Spark的与语言无关的pipe()方法通过标准输入输出来和其他程序交互。我们将使用pipe()方法访问R语言的库来计算业余无线电台联系人的距离。

最后,像使用键值对的工具一样,Spark有处理数值型数据的方法。我们会对业余无线电台日志计算出距离然后去除异常值,来演示这些处理数值的方法。

Accumulators(累加器)

我们通常把函数传递给Spark的时候,如map()函数或者按指定条件过滤的filter()函数,它们可以使用驱动程序中定义的变量,但是每个运行在集群上的task会得到一个新的变量副本,对这些副本的更新不会再回传给驱动程序。Spark的共享变量,累加器和广播变量放宽了对两种通信模式的限制:结果的汇总和广播。

第一个类型的共享变量是累加器,其对worker的节点的值进行累加并返回给驱动程序的语法比较简单。累加器最常用的一个用途就是统计job执行过程中的事件来进行调试。举例来说,我们加载那些想要检索的日志的呼号列表,但是我们对输入文件空白行的数目也感兴趣(可能我们并不想看到有效的输入中有这种空白行)。示例如下:

Example 6-2. Accumulator empty line count in Python

file = sc.textFile(inputFile)
# Create Accumulator[Int] initialized to 0
blankLines = sc.accumulator(0) def extractCallSigns(line):
global blankLines # Make the global variable accessible
if (line == ""):
blankLines += 1
return line.split(" ") callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value Example 6-3. Accumulator empty line count in Scala val sc = new SparkContext(...)
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // Create an Accumulator[Int] initialized to 0
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 // Add to the accumulator
}
line.split(" ")
}) callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value) Example 6-4. Accumulator empty line count in Java JavaRDD<String> rdd = sc.textFile(args[1]); final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(
new FlatMapFunction<String, String>() { public Iterable<String> call(Stringline) {
if (line.equals("")) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}}); callSigns.saveAsTextFile("output.txt")
System.out.println("Blank lines: "+ blankLines.value());

在例子中,我们创建了一个名叫blankLinesAccumulator[Int],然后当我们看到输入为空白行时为其加1。在对transformation(转换)求值之后,打印空白行总数。注意一点,我们只能在saveAsTextFile()action之后才能看到正确的值,因为之前的操作map()转换是惰性的,所以只有在惰性求值的map()saveAsTextFile强制求值后累加器的自增副作用才会生效。

当然,如果使用类似reduce的action可以对整个RDD的值累加并返回给驱动程序,但是有时我们需要更简单的方式对转换过程中RDD的值聚合,这些值与RDD本身的规模或粒度不同。在之前的例子中,我们在加载数据时累加器帮我们统计错误数量,不用再分别使用filter()reduce()

总结一下,累加器的工作流程如下:

  • 我们在驱动程序中通过调用SparkContext.accumulator(initialValue)方法创建累加器,这会为累加器生成一个初始值。返回一个org.apache.spark.Accumulator[T]类型的对象,泛型T是初始值的类型。
  • Spark闭包中的工作代码可以通过+=方法为累加器增加值(或者Java中的add方法)。
  • 驱动程序能够调用累加器上的值属性来访问他的值(Java中value()setValue()方法)

请注意,worker节点上的task无法访问累加器的value()方法--从这些任务的角度来看,累加器是只写变量。这使得累加器可以高效地执行,而无需每次更新都进行通信。

当需要跟踪多个值,或者在并行程序中的多个位置需要增加相同的值时,此处显示的计数类型就特别方便(例如,你可能需要统计程序中调用JSON解析库的次数)。例如,我们的数据经常会损坏一定比例,或者结尾部分会失败一些次数。为了防止出现错误时产生垃圾输出,我们可以使用计数器来记录有效记录,并使用计数器记录无效记录。我们累加器的值只有在驱动程序中才能访问,所以这是我们进行检查的地点。

继续我们之前的例子,我们只有在大部分输入有效的情况下才验证呼号并输出数据。国际电信联盟在第19条中规定了无线电呼号的格式,从中我们构造了一个正则表达式来验证一致性,如Example6-5所示。

Example 6-5. Accumulator error count in Python
# Create Accumulators for validating call signs
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0) def validateSign(sign):
global validSignCount, invalidSignCount
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
validSignCount += 1
return True
else:
invalidSignCount += 1
return False # Count the number of times we contacted each call sign
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y) # Force evaluation so the counters are populated
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.value)

Accumulators and Fault Tolerance(累加器和容错性)

Spark通过重新执行失败的或缓慢的task来自动应对机器失败或缓慢的情况。举例来说没如果节点运行一个分区的map()操作崩溃了,Spark会返回给另一个节点,即使节点没有崩溃,但是比其他节点慢得多,Spark也可以抢先在另一个节点上启动task的“speculative(推测)”副本,并在结束时取得结果。即使没有节点失败,Spark也可能必须重新运行一个task来重建内存不足的缓存值。因此,在相同的数据上,相同的函数可能会运行多次,这取决于集群上运行的情况。

所以是如何与累加器交互的呢?最终结果是,对于action中使用的累加器,Spark仅将每个task的更新应用于每个累加器一次。因此,如果我们在失败或求值多次的情况下仍想要一个可靠的绝对值计数器,一定要把它放进类似于foreach()的action中。

对于在transformation中而不是action中使用的累加器,这种保证就不存在了。一个累加器因transformation导致的更新可能会不止一次。当已经缓存过但不怎么使用的RDD从LRU缓存中驱逐出来并且紧接着又需要使用该RDD的情况下可能会导致无意识地多次更新。这迫使RDD根据其依赖关系重新求值,并且无意识的副作用会对该依赖关系中transformation的累加器调用更新,然后再发回给驱动程序。在转换中的累加器,应该只在调试时使用。

虽然未来版本的Spark可能会更改此行为来仅对更新进行一次计数,但当前版本(1.2.0)确实具有多重更新行为,因此建议仅在进行调试时使用转换中的累加器。

Custom Accumulators(定制累加器)

目前为止我们已经看到了一种Spark内置累加器类型使用方式:整数的加法(Accumulator[Int])。Spark支持很多累加器类型,如Double,Long和Float。除此之外,Spark还有一个自定义累加器类型和累加器操作的API(如,寻找累加值中的最大值)。自定义累加器需要继承AccumulatorParam,Spark API文档中有详细的介绍。除了数值类型可以相加,只要提供的操作是满足交换律和结合律,我们就可以用对任意操作使用加法。举例来讲,除了使用加法来追踪总数,我们还可以追踪所见元素的最大值。

如果操作op满足--对于所有的值a,b;都有 a op b = b op a,那么op满足交换律。

如果操作op满足--对于所有的值a,b,c;都有 (a op b) op c = a op (b op c),那么op满足结合律。

举例来说,summax是满足交换律和结合律的操作,所以可以用于Spark的累加器。

Broadcast Variables(广播变量)

Spark第二个共享变量类型是广播变量,它允许程序高效地发送只读的大型值给所有的工作节点,以用于一个或多个Spark操作。例如,如果你的应用程序需要将大型只读查找表发送到所有节点,或者是机器学习算法中的大型特征向量,则它们就派上用场了。

回忆一下,Spark会自动把闭包中引用的变量发送给所有节点。尽管这很方便,但是可能会导致效率低下,因为(1)默认的任务启动机制对task的大小进行了优化;(2)事实上,你可能会在多个并行操作中使用相同的变量,但Spark会为每个操作单独发送它。作为一个例子,假设我们想编写一个Spark程序,该程序根据数组中的前缀匹配国家呼号查找国家/地区。这对于无线电呼号很有用,因为每个国家都有自己的前缀,但前缀长度不统一。如果我们在Spark中想当然地写这个代码,代码可能看起来像Example6-6。

Example 6-6. Country lookup in Python

# Look up the locations of the call signs on the
# RDD contactCounts. We load a list of call sign
# prefixes to country code to support this lookup.
#在contactCounts RDD上查找呼号的位置
#我们加在一个国家呼号前缀的列表来帮助查询
signPrefixes = loadCallSignTable() def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes)
count = sign_count[1]
return (country, count) countryContactCounts = (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x+ y)))

这个程序可以运行,但如果我们有一个大型的表(保存了IP地址而不是呼号的表),那么signPrefixes可以很轻易达到几百万字节,这样的话,把数组从master发送到每个task的代价就非常高了。除此之外,如果我们之后使用相同的signPrefixes对象(可能我们一会会在file2.txt上运行),这又会再次发送给每个节点。

我们可以把signPrefixes改成一个广播变量。广播变量就是一个spark.broadcast.Broadcast[T]类型的对象,对T类型的值进行了包装。我们可以在task上通过调用Broadcast对象的value来访问广播变量的值。这个值只会向节点发送一次,使用高效地,类似比特流的传输机制。

使用广播变量,我们之前的例子变成了下面的样子:

Example 6-7. Country lookup with Broadcast values in Python

# Look up the locations of the call signs on the
# RDD contactCounts. We load a list of call sign
# prefixes to country code to support this lookup.
signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count) countryContactCounts = (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x+ y))) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt") Example 6-8. Country lookup with Broadcast values in Scala // Look up the countries for each call sign for the
// contactCounts RDD. We load an array of call sign
// prefixes to country code to support this lookup.
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}.reduceByKey((x, y) => x + y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt") Example 6-9. Country lookup with Broadcast values in Java
// Read in the call sign table
// Look up the countries for each call sign in the
// contactCounts RDD
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());
JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
new PairFunction<Tuple2<String, Integer>, String, Integer> (){
public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
String sign = callSignCount._1();
String country = lookupCountry(sign, callSignInfo.value());
return new Tuple2(country, callSignCount._2());
}}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

如例所示,使用广播变量非常简单:

  • 1.通过调用SparkContext.broadcast创建一个Broadcast[T]对象,T类型对象必须是可序列化的。
  • 2.使用vvalue属性(或者Java中使用value()方法)访问它的值。
  • 3.变量只会发送到每个节点一次,并且应该当做只读值对待(更新值不会传播到其他节点)。

满足只读要求的最简单方法是广播原始值或对不可变对象的引用。这种情况,你就无法改变广播变量的值了,除了在驱动程序代码中。但是,有时广播变量是一个可变对象会更方便。如果你这样做了,变量的只读性只能由你来保证了。就像我们对呼号前缀的数组做的那样,必须确保在工作节点运行的代码不去做类似于val theArray = broadcastArray.value; theArray(0) = newValue的操作。当在工作节点上运行的时候,这几行代码会将当前节点的本地数组副本的第一个元素设置为newValue;它不会改变其他工作节点的broadcastArray.value的内容。

Optimizing Broadcasts(广播变量优化)

当我们广播一个大型值的时候,选择紧凑快速的数据序列化格式是很重要的,因为如果对值序列化或在网络上发送该序列化值需要很长时间,那么网络发送该序列化值会成为程序的瓶颈。特别是,Java Serialization(Spark Spark Scala和Java API中使用的默认序列化库)对于除基本类型数组之外的任何内容都可能非常低效。你可以通过spark.serializer属性选择不同的序列化库(第八章会介绍如何使用Kryo,一个非常高效的序列化库),或者通过为你的数据类型实现你自己的序列化规范(例如使用java.io.Externalizable接口为Java序列化,或使用reduce()方法为Python的pickle库定义自定义序列化)。

上一篇:Learning Spark中文版--第六章--Spark高级编程(2)


下一篇:Ionic.Zip.dll文件压缩和解压