2021-09-20

spark-core rdd行动算子

1、reduce()

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
/**
 * ➢ 函数签名
    def reduce(f: (T, T) => T): T
    ➢ 函数说明
    聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
 */
val i: Int = rdd.reduce(_+_)
println(i)
sc.stop()

2、collect()

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
/**
 *➢ 函数签名
    def collect(): Array[T]
    ➢ 函数说明
    在驱动程序中,以数组 Array 的形式返回数据集的所有元素
    collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
 */
val ints: Array[Int] = rdd.collect()
println(ints.mkString(","))
sc.stop()

3、count()

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
/**
 * ➢ 函数签名
    def count(): Long
    ➢ 函数说明
    返回 RDD 中元素的个数
    count : 数据源中数据的个数
 */
val cnt = rdd.count()
println(cnt)
sc.stop()

4、first()

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
**
 * ➢ 函数签名
    def first(): T
    ➢ 函数说明
    返回 RDD 中的第一个元素
    first : 获取数据源中数据的第一个
 */
val first = rdd.first()
println(first)
sc.stop()

5、take(num:Int)

/**
 * ➢ 函数签名
    def take(num: Int): Array[T]
    ➢ 函数说明
    返回一个由 RDD 的前 n 个元素组成的数组
    take : 获取N个数据
 */
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))

6、takeOrdered(num:Int)

/**
 * ➢ 函数签名
    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
    ➢ 函数说明
    返回该 RDD 排序后的前 n 个元素组成的数组
 */
// takeOrdered : 数据排序后,取N个数据
val rdd1 = sc.makeRDD(List(4,2,3,1))
val ints1: Array[Int] = rdd1.takeOrdered(3)
println(ints1.mkString(","))

7、aggregate(初始值)(分区内逻辑,分区间逻辑)

/**
 * ➢ 函数签名
    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    ➢ 函数说明
    分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
 */
    //List(1,2,3,4),2  10+(分区数+1)*初始值
//10 + 13 + 17 = 40
// aggregateByKey : 初始值只会参与分区内计算
// aggregate : 初始值会参与分区内计算,并且和参与分区间计算
val result = rdd.aggregate(10)(_+_, _+_)

8、fold(初始值)(逻辑)

/**
 * ➢ 函数签名
    def fold(zeroValue: T)(op: (T, T) => T): T
    ➢ 函数说明
    折叠操作,aggregate 的简化版操作,当分区内和分区间运算规则相同时
 */
val result = rdd.fold(10)(_+_)

9、countByKey()

/**
 * ➢ 函数签名
    def countByKey(): Map[K, Long]
    ➢ 函数说明
    统计每种 key 的个数
 */
  //val intToLong: collection.Map[Int, Long] = rdd.countByValue()
  //println(intToLong)
  val stringToLong: collection.Map[String, Long] = rdd.countByKey()
  println(stringToLong)

10、SavaAsTextFile(path:String)

/**
 * ➢ 函数签名
      def saveAsTextFile(path: String): Unit // 保存成 Text 文件
      def saveAsObjectFile(path: String): Unit // 序列化成对象保存到文件
      def saveAsSequenceFile(
       path: String,
       codec: Option[Class[_ <: CompressionCodec]] = None): Unit // 保存成 Sequencefile 文件
      ➢ 函数说明
      将数据保存到不同格式的文件中
 */
  rdd.saveAsTextFile("output")
  rdd.saveAsObjectFile("output1")
  // saveAsSequenceFile方法要求数据的格式必须为K-V类型
  rdd.saveAsSequenceFile("output2")

11、foreach(f:T)

/**
 * ➢ 函数签名
      def foreach(f: T => Unit): Unit = withScope {
       val cleanF = sc.clean(f)
       sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
      }➢ 函数说明
      分布式遍历 RDD 中的每一个元素,调用指定函数
 */
  // foreach 其实是Driver端内存集合的循环遍历方法
  rdd.collect().foreach(println)
  // foreach 其实是Executor端内存数据打印
  rdd.foreach(println)
上一篇:transformation算子基本原理二


下一篇:Spark原理及源码解析【第六阶段模块四】