Spark算子总结及案例

spark算子大致上可分三大类算子:

  1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。

  2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。

  3、Action算子,这类算子会触发SparkContext提交作业。

一、Value型Transformation算子

1)map

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), )
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,), (salmon,), (salmon,), (rat,), (elephant,))

Spark算子总结及案例

2)flatMap

val a = sc.parallelize( to , )
a.flatMap( to _).collect
res47: Array[Int] = Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ) sc.parallelize(List(, , ), ).flatMap(x => List(x, x, x)).collect
res85: Array[Int] = Array(, , , , , , , , )

Spark算子总结及案例

3)mapPartiions

val x  = sc.parallelize( to , )
x.flatMap(List.fill(scala.util.Random.nextInt())(_)).collect res1: Array[Int] = Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , )

Spark算子总结及案例

4)glom(形成一个Array数组)

val a = sc.parallelize( to , )
a.glom.collect
res8: Array[Array[Int]] = Array(Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ), Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ), Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ))

Spark算子总结及案例

5)union

val a = sc.parallelize( to , )
val b = sc.parallelize( to , )
(a ++ b).collect
res0: Array[Int] = Array(, , , , , )

Spark算子总结及案例

6)cartesian(笛卡尔操作)

val x = sc.parallelize(List(,,,,))
val y = sc.parallelize(List(,,,,))
x.cartesian(y).collect
res0: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,), (,))

Spark算子总结及案例

7)groupBy(生成相应的key,相同的放在一起)

val a = sc.parallelize( to , )
a.groupBy(x => { if (x % == ) "even" else "odd" }).collect
res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(, , , )), (odd,ArrayBuffer(, , , , )))

Spark算子总结及案例

8)filter

val a = sc.parallelize( to , )
val b = a.filter(_ % == )
b.collect
res3: Array[Int] = Array(, , , , )

Spark算子总结及案例

9)distinct(去重)

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), )
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

Spark算子总结及案例

10)subtract(去掉含有重复的项)

val a = sc.parallelize( to , )
val b = sc.parallelize( to , )
val c = a.subtract(b)
c.collect
res3: Array[Int] = Array(, , , , , )

Spark算子总结及案例

11)sample

val a = sc.parallelize( to , )
a.sample(false, 0.1, ).count
res24: Long =

Spark算子总结及案例

12)takesample

val x = sc.parallelize( to , )
x.takeSample(true, , )
res3: Array[Int] = Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , )

Spark算子总结及案例

13)cache、persist

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), )
c.getStorageLevel
res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, )
c.cache
c.getStorageLevel
res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, )

Spark算子总结及案例

二、Key-Value型Transformation算子

1)mapValues

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), )
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((,xdogx), (,xtigerx), (,xlionx), (,xcatx), (,xpantherx), (,xeaglex))

Spark算子总结及案例

2)combineByKey

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), )
val b = sc.parallelize(List(,,,,,,,,), )
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((,List(cat, dog, turkey)), (,List(gnu, rabbit, salmon, bee, bear, wolf)))

Spark算子总结及案例

3)reduceByKey

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), )
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((,dogcatowlgnuant)) val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), )
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res87: Array[(Int, String)] = Array((,lion), (,dogcat), (,panther), (,tigereagle))

Spark算子总结及案例

4)partitionBy

(对RDD进行分区操作)

Spark算子总结及案例

5)cogroup

val a = sc.parallelize(List(, , , ), )
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(,(ArrayBuffer(b),ArrayBuffer(c))),
(,(ArrayBuffer(b),ArrayBuffer(c))),
(,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
)

Spark算子总结及案例

6)join

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), )
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), )
val d = c.keyBy(_.length)
b.join(d).collect res0: Array[(Int, (String, String))] = Array((,(salmon,salmon)), (,(salmon,rabbit)), (,(salmon,turkey)), (,(salmon,salmon)), (,(salmon,rabbit)), (,(salmon,turkey)), (,(dog,dog)), (,(dog,cat)), (,(dog,gnu)), (,(dog,bee)), (,(rat,dog)), (,(rat,cat)), (,(rat,gnu)), (,(rat,bee)))

Spark算子总结及案例

7)leftOutJoin

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), )
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), )
val d = c.keyBy(_.length)
b.leftOuterJoin(d).collect res1: Array[(Int, (String, Option[String]))] = Array((,(salmon,Some(salmon))), (,(salmon,Some(rabbit))), (,(salmon,Some(turkey))), (,(salmon,Some(salmon))), (,(salmon,Some(rabbit))), (,(salmon,Some(turkey))), (,(dog,Some(dog))), (,(dog,Some(cat))), (,(dog,Some(gnu))), (,(dog,Some(bee))), (,(rat,Some(dog))), (,(rat,Some(cat))), (,(rat,Some(gnu))), (,(rat,Some(bee))), (,(elephant,None)))

8)rightOutJoin

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), )
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), )
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect res2: Array[(Int, (Option[String], String))] = Array((,(Some(salmon),salmon)), (,(Some(salmon),rabbit)), (,(Some(salmon),turkey)), (,(Some(salmon),salmon)), (,(Some(salmon),rabbit)), (,(Some(salmon),turkey)), (,(Some(dog),dog)), (,(Some(dog),cat)), (,(Some(dog),gnu)), (,(Some(dog),bee)), (,(Some(rat),dog)), (,(Some(rat),cat)), (,(Some(rat),gnu)), (,(Some(rat),bee)), (,(None,wolf)), (,(None,bear)))

三、Actions算子

1)foreach

val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), )
c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy

Spark算子总结及案例

2)saveAsTextFile

val a = sc.parallelize( to , )
a.saveAsTextFile("mydata_a")
// :: INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.-incubating-bin-cdh4/bin/mydata_a

Spark算子总结及案例

3)saveAsObjectFile

val x = sc.parallelize( to , )
x.saveAsObjectFile("objFile")
val y = sc.objectFile[Int]("objFile")
y.collect
res52: Array[Int] = Array[Int] = Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , )

Spark算子总结及案例

4)collect

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), )
c.collect
res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

Spark算子总结及案例

5)collectAsMap

val a = sc.parallelize(List(, , , ), )
val b = a.zip(a)
b.collectAsMap
res1: scala.collection.Map[Int,Int] = Map( -> , -> , -> )

Spark算子总结及案例

6)reduceByKeyLocally

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), )
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((,dogcatowlgnuant))

7)lookup

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), )
val b = a.map(x => (x.length, x))
b.lookup()
res0: Seq[String] = WrappedArray(tiger, eagle)

Spark算子总结及案例

8)count

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), )
c.count
res2: Long =

Spark算子总结及案例

9)top

val c = sc.parallelize(Array(, , , , , ), )
c.top()
res28: Array[Int] = Array(, )

10)reduce

val a = sc.parallelize( to , )
a.reduce(_ + _)
res41: Int =

11)fold

val a = sc.parallelize(List(,,), )
a.fold()(_ + _)
res59: Int =

Spark算子总结及案例

12)aggregate

val z = sc.parallelize(List(,,,,,), )

// lets first print out the contents of the RDD with partition labels
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
} z.mapPartitionsWithIndex(myfunc).collect
res28: Array[String] = Array([partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ], [partID:, val: ]) z.aggregate()(math.max(_, _), _ + _)
res40: Int =

参考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

上一篇:如何使用less(变量,混合,匹配,运算,嵌套...)


下一篇:UVA-129 Krypton Factor(回溯)