RDD的创建
通过已知的并行集合创建。可以通过已知的SparkContext的parallelize方法将一个已存在的集合变成RDD
data=[1,2,3,4,5]
distData=sc.parallelize(data) #通过并行化创建RDD
distData.collect() 将内存中的数据显示子啊屏幕中
distData=sc.parallelize(data,10) #10就是指数据集的分区个数
集群中每一个分区代表对应一个Spark任务(task)
从外部数据集创建。Spark可以在本地文件系统、文本文件、HDFS等存储源中创建RDD。通过SparkContext的textFIle方法将数据源文件转换为RDD,此方法需要传递一个文件的地址,例如,以file:/// 、hdfs:/// 等形式开头的地址。转换后数据将会以集合的方式进行存储。
distFile=sc.textFile("file:///data/data.txt")#file后面为三个斜杠避免报错
distFile.collect()
读取文件夹的形式
sc.textFile("/my/directory")
textFIle方法可以通过第二个可选的参数来控制该文件的分区数量
sc.teztFile("/my/directory",4)
RDD的操作
RDD支持两种类型的操作:转换(transformations)和行动(action)。transformations操作会在一个已存在的RDD上创建一个新的RDD,但实际上并没有执行,仅仅是记录操作过程,所有的计算都发生在action操作环节。action操作会执行记录的所有transformations操作并计算结果,结果可返回到drive程序,也可保存到相关存储系统中。
transform算子
Spark中所有的transformations都是懒加载的,在转换时不会立刻计算出结果,只记录数据集的转换过程,当驱动程序需要返回结果时,transformations才开始进行计算,这使得Spark的运行更加高效。
map转换
rdd=sc.parallelize(["b","a","c"])
rdd.map(lambda x:(x,1)).collect()
[('b',1),('a',1),('c',1)]
flatMap转换
flatMap转换首先将mao函数应用于RDD的所有元素,然后使结果平坦化,最后返回新的RDD
rdd=sc.parallelize([2,3,4])
rdd.flatMap(lambda x: range(1,x)).collect()
[1,1,2,1,2,3]
rdd.flatMap(lambda x:[(x,x),(x,x)]).collect()
[(2,2),(2,2),(3,3),(3,3),(4,4),(4,4)]
map和flapMap的区别
rdd=sc.parallelize([2,3,4])
rdd.map(lambda x:range(1,x)).collect()
[range(1,2),range(1,3),range(1,4)]
filter转换
filter函数返回包含指定过滤条件的元素。
rdd=sc.parallelize([1,2,3,4,5])
rdd.filter(lambda x:x%2==0).collect() #对2求余。返回余数为0的数
[2,4]
rdd=sc.parallelize(["bbbb","aqqqq","cvv"])
rdd.filter(lambda x:len(x)>4).collect()
['aqqqq]
union转换
union转换是对一个RDD和参数RDD求并集后,返回一个新的RDD过程。
rdd=sc.parallelize([1,1,2,3])
rdd1=sc.parallelize([1,2,3,4])
rdd.union(rdd).collect()
([1,1,2,3,1,2,3,4])
rdd.union(rdd1).collect()
([1,1,2,3,1,2,3,4])
intersection转换
intersection转换是对一个RDD和另一个RDD的交集,其输出将不包含任何重复的元素
rdd1=sc.parallelize([1,10,2,3,4,5])
rdd2=sc.parallelize([1,6,2,3,7,8])
rdd.intersection(rdd2).collect()
[1,2,3]
distinct转换
distinct转换操作返回不同元素的新RDD。
rdd=sc.parallelize([1,1,2,3])
rdd.distinct().collect()
[2,1,3]
sortBy转换
sort转换通过指定key的方法对RDD内部元素进行排序
tmp=[('a',1),('b',2),('1',3),('2',4),('3',5)]
sc.parallelize(tmp).sortBy(lambda x:x[0]).collect()
[('1',3),('2',5),('a',1),('b',2),('d',4)]
sc.parallelize(tmp).sortBy(lambda x:x[1]).collect()
[('a',1),('b',2),('1',3),('2',4),('3',5)]
sc.parallelize(tmp).sortBy(lambda x:x[1],False).collect()
[('3',5),('2',4),('1',3),('b',2),('a',1)]
ascending默认值为True表示正序,当设置ascending为False时表示倒序。
mapPartitions转换
map作用于每一个元素,mapPartitions则作用于一个分区
rdd=sc.parallelize([1,2,3,4],2)
def f(iterator):yield sum(iterator)
rdd.mapPartitions(f).collect()
[3,7]
mapPartitionsWithIndex转换
mapPartitionsWithIndex传入的函数可接受两个参数,第一个参数为分区编号,第二个参数为对应分区的元素组成的迭代器
rdd=sc.parallelize([1,2,3,4],2)
def f(splitIndex,iterator): yield splitIndex,list(iterator)
...
rdd.mapPartitionsWithIndex(f).collect()
[(0,[1,2]),(1,[3,4])] #0为分区编号 [1,2]为分区中的元素值
partitionBy转换
针对Key-Value结构的RDD重新分区,采用Hash分区
paris=sc.parallelize([1,2,3,4,2,4,1]).map(lambda x:(x,x))
paris.partitionBy(2).glom().collect()
[[(2,2),(4,4),(2,2),(4,4)],[(1,1),(3,3),(1,1)]]
paris.partitionBy(3).glom().collect()
[[(3,3)],[(1,1),(4,4),(4,4),(1,1)],[(2,2),(2,2)]]