RDD操作

RDD被创建好以后,在后续使用过程中一般会发生两种操作:

  • 转换(Transformation): 基于现有的数据集创建一个新的数据集。
  • 行动(Action):在数据集上进行运算,返回计算值。

转换操作:

进行物理的转换操作

  • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
val rdd =sc.parallelize(List(1,2,3,4,5,6)) 
val filterRdd = rdd.filter(_> 5)
filterRdd.collect()
//返回所有大于5的数据的一个Array, Array(6,8,10,12)
  • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.map( _ + 10)
//Array[Int] = Array(11, 12, 13, 14)
  • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
val a=Array(1,2,3,4,5)
val b=a.flatMap(1 to _)
//b: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

val b=a.flatMap(_ to 5)
//b: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 3, 4, 5, 4, 5, 5)

  • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

分组函数。

  • 对类型 (K, V) 的数据集使用,返回(K, Iterable)类型的数据集
  • 如果想在分组后使用sum,average等聚合函数,最好使用 reduceByKey 或 aggregateByKey,这将获得更好的性能
  • 默认的并行度依赖于父RDD,也可以传入可选参数numTasks指定并行任务数量。
val a=sc.parallelize(Array("a"->1,"a"->2,"b"->3))
a.groupByKey.collect
//res3: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))

  • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

按照key分组然后聚集,类似于SQL中的groupby之后再使用聚集函数。
当一个 (K, V) 类型的数据集调用此函数, 返回一个同样是(K, V) 类型的数据集。

scala> a.groupByKey.collect
res3: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))

scala> val a=sc.parallelize(Array("a"->1,"a"->2,"a"->3,"b"->4,"b"->5))
a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> a.reduceByKey((x,y)=>x+y).collect
res4: Array[(String, Int)] = Array((a,6), (b,9))

行动操作:

  • count() 返回数据集中的元素个数
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.count()
res6: Long = 10

  • collect() 以数组的形式返回数据集中的所有元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  • first() 返回数据集中的第一个元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.first
res8: Int = 1
  • take(n) 以数组的形式返回数据集中的前n个元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24


scala> a.take(6)
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> a.take(11)
res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala>  a.take(100)
res12: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.reduce((x,y)=>x+y)
res13: Int = 55

scala> a.reduce((x,y)=>x*y)
res14: Int = 3628800

scala> a.reduce((x,y)=>x-y)
res15: Int = -53

  • foreach(func) 将数据集中的每个元素传递到函数func中运行
scala> val a=sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> a.foreach(println)
1
2
3
4
5
6
7
8
9
10

  • union(otherDataset) 求并集。
scala> val b=sc.parallelize(2 to 5)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> a.union(b).collect
res19: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5)

  • intersection(otherDataset) 求交集。
scala> a.intersection(b).collect
res22: Array[Int] = Array(4, 3, 5, 2)
  • distinct([numTasks])) 去重、
scala> val c=Array(1,1,2,3,4,5,6,5,3,3,1)
c: Array[Int] = Array(1, 1, 2, 3, 4, 5, 6, 5, 3, 3, 1)

scala> c.distinct
res24: Array[Int] = Array(1, 2, 3, 4, 5, 6)

  • sortByKey([ascending], [numTasks]) 排序
  • ascending: 可选,是否升序排序
    numTasks: 可选,并发任务数量
    对于 (K, V) 的数据集进行操作,返回同样是(K, V)类型的数据集,其中K实现了Orderedtrait,也就是可以排序。
val p = sc.parallelize(Array("b" -> 1,"d" ->2,"a" ->3, "c" -> 4))
p.sortByKey().collect

res139: Array[(String, Int)] = Array((a,3), (b,1), (c,4), (d,2))

//数组排序
scala> val c=Array(1,1,2,3,4,5,6,5,3,3,1)
c: Array[Int] = Array(1, 1, 2, 3, 4, 5, 6, 5, 3, 3, 1)

scala> c.sorted
res28: Array[Int] = Array(1, 1, 1, 2, 3, 3, 3, 4, 5, 5, 6)
上一篇:CSP 201903-1 大中小


下一篇:Spark入门之idea编写Scala脚本