RDD被创建好以后,在后续使用过程中一般会发生两种操作:
- 转换(Transformation): 基于现有的数据集创建一个新的数据集。
- 行动(Action):在数据集上进行运算,返回计算值。
转换操作:
进行物理的转换操作
- filter(func):筛选出满足函数func的元素,并返回一个新的数据集
val rdd =sc.parallelize(List(1,2,3,4,5,6))
val filterRdd = rdd.filter(_> 5)
filterRdd.collect()
//返回所有大于5的数据的一个Array, Array(6,8,10,12)
- map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.map( _ + 10)
//Array[Int] = Array(11, 12, 13, 14)
- flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
val a=Array(1,2,3,4,5)
val b=a.flatMap(1 to _)
//b: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
val b=a.flatMap(_ to 5)
//b: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 3, 4, 5, 4, 5, 5)
- groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
分组函数。
- 对类型 (K, V) 的数据集使用,返回(K, Iterable)类型的数据集
- 如果想在分组后使用sum,average等聚合函数,最好使用 reduceByKey 或 aggregateByKey,这将获得更好的性能
- 默认的并行度依赖于父RDD,也可以传入可选参数numTasks指定并行任务数量。
val a=sc.parallelize(Array("a"->1,"a"->2,"b"->3))
a.groupByKey.collect
//res3: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))
- reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
按照key分组然后聚集,类似于SQL中的groupby之后再使用聚集函数。
当一个 (K, V) 类型的数据集调用此函数, 返回一个同样是(K, V) 类型的数据集。
scala> a.groupByKey.collect
res3: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))
scala> val a=sc.parallelize(Array("a"->1,"a"->2,"a"->3,"b"->4,"b"->5))
a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> a.reduceByKey((x,y)=>x+y).collect
res4: Array[(String, Int)] = Array((a,6), (b,9))
行动操作:
- count() 返回数据集中的元素个数
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> a.count()
res6: Long = 10
- collect() 以数组的形式返回数据集中的所有元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> a.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- first() 返回数据集中的第一个元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> a.first
res8: Int = 1
- take(n) 以数组的形式返回数据集中的前n个元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> a.take(6)
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> a.take(11)
res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> a.take(100)
res12: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> a.reduce((x,y)=>x+y)
res13: Int = 55
scala> a.reduce((x,y)=>x*y)
res14: Int = 3628800
scala> a.reduce((x,y)=>x-y)
res15: Int = -53
- foreach(func) 将数据集中的每个元素传递到函数func中运行
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> a.foreach(println)
1
2
3
4
5
6
7
8
9
10
- union(otherDataset) 求并集。
scala> val b=sc.parallelize(2 to 5)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> a.union(b).collect
res19: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5)
- intersection(otherDataset) 求交集。
scala> a.intersection(b).collect
res22: Array[Int] = Array(4, 3, 5, 2)
- distinct([numTasks])) 去重、
scala> val c=Array(1,1,2,3,4,5,6,5,3,3,1)
c: Array[Int] = Array(1, 1, 2, 3, 4, 5, 6, 5, 3, 3, 1)
scala> c.distinct
res24: Array[Int] = Array(1, 2, 3, 4, 5, 6)
- sortByKey([ascending], [numTasks]) 排序
- ascending: 可选,是否升序排序
numTasks: 可选,并发任务数量
对于 (K, V) 的数据集进行操作,返回同样是(K, V)类型的数据集,其中K实现了Orderedtrait,也就是可以排序。
val p = sc.parallelize(Array("b" -> 1,"d" ->2,"a" ->3, "c" -> 4))
p.sortByKey().collect
res139: Array[(String, Int)] = Array((a,3), (b,1), (c,4), (d,2))
//数组排序
scala> val c=Array(1,1,2,3,4,5,6,5,3,3,1)
c: Array[Int] = Array(1, 1, 2, 3, 4, 5, 6, 5, 3, 3, 1)
scala> c.sorted
res28: Array[Int] = Array(1, 1, 1, 2, 3, 3, 3, 4, 5, 5, 6)