3.9 共享变量
Spark使用的架构是无共享的。数据分布在集群的各个节点上,每个节点都有自己的CPU、内存和存储资源。没有全局的内存空间用于任务间共享。驱动程序和任务之间通过消息共享数据。
举例来说,如果一个RDD操作的函数参数是驱动程序中变量的引用,Spark会将这个变量的副本以及任务一起发送给执行者。每个任务都有一份变量的副本并把它当成只读变量使用。任何对这个变量的更新都只存在任务的内部,改动并不会回传给驱动程序。而且Spark会把这个变量在每一个阶段的开始发送给worker节点。
对于一些应用而言,这种默认行为是低效的。在一个实际的使用场景中,驱动程序在作业的任务间共享了一个巨大的查找表。而这个作业由多个阶段构成。默认情况下,Spark会自动将这个变量及其相关任务发送给每个执行者。然而,Spark会在每个阶段做这件事。如果这个查找表存储了100MB的数据,并且这个作业涉及10个阶段,那么Spark就会给每个worker节点发送10次100MB的相同数据。
另外一个使用场景是在每个运行在不同节点上的任务中需要更新全局变量。默认情况下,任务中对变量的更新是不会回传给驱动程序的。
Spark通过共享变量的概念来满足这些使用场景的需求。
3.9.1 广播变量
广播变量的使用使得Spark应用可以有效地在驱动程序和执行作业的任务之间共享数据。Spark只会给worker节点发送一次广播变量,并且将它反序列化成只读变量存储在执行者的内存中。而且,Spark采用一种更高效的算法来发布广播变量。
注意,如果一个作业由多个阶段构成,且阶段中的任务使用同一个驱动程序的变量,那么使用广播变量是十分有用的。如果你不想在开始执行每个任务之前反序列化变量,使用广播变量也是有益的。默认情况下,Spark会将传输过来的变量以序列化的形式缓存在执行者的内存中,在开始执行任务之前再反序列化它。
SparkContext 类提供了一个叫作broadcast的方法用于创建广播变量。它把一个待广播的变量作为参数,返回一个Broadcast类实例。一个任务必须使用Broadcast对象的value方法才可以获取广播变量的值。
考虑这样一个应用,它根据电商交易信息生成交易详情。在现实世界的应用中会有一张顾客表、一张商品表和一张交易表。为了简化起见,我们直接用一些简单的数据结构来代替这些表作为输入数据。
使用广播变量使得我们可以高效地实现顾客数据、商品数据和交易数据之间的连接。我们可以通过使用RDD API来实现连接操作,但是这会在网络间对顾客数据、商品数据和交易数据做shuffle操作。使用广播变量,我们使得Spark只将顾客数据和商品数据发送给每个节点一次,并且用简单的map操作来代替耗时的join操作。
3.9.2 累加器
累加器是只增变量,它可以被运行在不同节点上的任务更改并且被驱动程序读取。它可以用于计数器和聚合操作。Spark提供了数值类型的累加器,也支持创建自定义类型的累加器。
SparkContext类提供了一个叫作accumulator的方法用于创建累加器变量。它有两个参数。第一个参数是累加器的初值,第二个是在Spark UI中显示的名字,这是一个可选参数。它返回一个Accumulator类实例。这个类实例为操作累加器变量提供操作符。任务只能采用add方法或者+=操作符来增加累加器变量的值。只有驱动程序可以通过value方法来获取累加器的值。
考虑这样一个应用,它需要从顾客表中过滤出不合法的顾客并计数。在现实世界的应用中,我们会从硬盘中读取数据并将过滤后的数据写入到硬盘中的另外一个文件。为简化起见,我们跳过读写硬盘的部分。
在使用累加器的时候需要注意,转换操作期间对累加器的更新无法保证恰好只有一次。如果一个任务或一个阶段重复执行,每一个任务的更新操作就会多次执行。
而且,对累加器的更新操作并不是在RDD的操作方法被调用时才执行的。RDD的转换操作是惰性的,转换操作中对累加器的更新并不会立即执行。因此,如果驱动程序在操作方法被调用之前就使用累加器的值,那么它将得到一个错误的值。