RDD:弹性分布式数据集 一、RDD的介绍
1.1 背景
许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,共同之处是,不同计算阶段之间会重用中间结果。 目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。 RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性, 只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储。
1.2 RDD的简述
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
1.3 RDD的属性
(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。大数据培训
(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
1.4 RDD在Spark架构中的运行过程
(1)创建RDD对象;
(2)SparkContext负责计算RDD之间的依赖关系,构建DAG;
(3)DAGScheduler负责把DAG图分解成多个Stage,每个Stage中包含了多个Task,每个Task会被TaskScheduler分发给各个WorkerNode上的Executor去执行。
二、RDD的使用
2.1 RDD的创建
有3种方式创建RDD:
(1)通过读取文件或者一组文件生成
(2)通过内存中的数据(并行化创建)
(3)通过现有的RDD
2.1.1 通过读取文件生成的
由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
案例:
# 从protocols文件创建RDD
distFile = sc.textFile("/etc/protocols")
1.
2.
2.1.2 通过并行化的方式创建RDD
由一个已经存在的Scala集合创建。将程序中的一个集合传给SparkContext的parallelize()方法。
案例:
>>> data = [1,3,5,7,9]
>>> distData = sc.parallelize(data)
# 对RDD进行测试操作
# 对集合中的所有元素进行相加,返回结果为25
>>> distData.reduce(lambda a,b:a+b)
[Stage 0:> (0
[Stage 0:> (0
[Stage 0:> (0 + 4) / 4]
[Stage 0:==============> (1
25
2.1.3 通过现有的RDD
2.2 RDD的操作方式
2.2.1 Transformations(转化)
基于现有的RDD创建新RDD,将已存在的数据集转换成新的数据集,例如map。转换是惰性的,不会立刻计算结果,仅仅记录转换操作应用的目标数据集,当动作需要一个结果时才计算。
案例:
>>> lines = sc.textFile("README.md")
>>> pythonLines = lines.filter(lambda line:"Python" in line)
>>> pythonLines.first()
基本转化操作:
对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作
对数据分别为{1, 2,3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作
2.2.2 actions(行动)
返回值,数据集计算后返回一个值给驱动程序,例如reduce
案例:
# 从protocols文件创建RDD
>>> distFile = sc.textFile("/etc/protocols")
# Map操作,每行的长度,转换操作
>>> lineLengths = distFile.map(lambda s: len(s))
# Reduce操作,获得所有行长度的和,即文件总长度,这里才会执行map运算
>>> totalLength = lineLengths.reduce(lambda a, b: a + b)
# 可以将转换后的RDD保存到集群内存中
>>> lineLengths.persist()
基本执行操作:
对一个数据为{1, 2,3, 3}的RDD进行基本的RDD行动操作
惰性求值:RDD的转化操作是惰性求值的,即在被调用行动操作之前Spark不会开始计算,相反,Spark会在内部记录下索要求执行的操作的相关信息。例如,当我们调用jsc.textFile()时,数据并没有读取进来,而是在必要时才会读取。Spark使用惰性求值,就可以把一些操作合并到一起来减少计算数据的步骤。
三、lambda说明
lambda 语句被用来创建新的函数对象,并且在运行时返回它们
一般的函数定义
def add(a,b):
return a+b
lambda 定义:
add = lambda a,b: a+b
调用函数的区别:
一般的函数:
add(1,2)
lambda 函数:
add(1, 2)
其中 lambda 函数可以进行 匿名函数调用,即
add(1, 2) => (lambda a,b: a+b)(1,2)
但是 def 方法不能