Spark--spark核心编程(RDD)

RDD


Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

RDD

1.什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
Spark--spark核心编程(RDD)

弹性

  • 存储的弹性:内存与磁盘的自动切换;
  • 容错的弹性:数据丢失可以自动恢复;
  • 计算的弹性:计算出错重试机制;
  • 分片的弹性:可根据需要重新分片。

分布式

数据存储在大数据集群不同节点上

数据集

RDD 封装了计算逻辑,并不保存数据

数据抽象

RDD 是一个抽象类,需要子类具体实现

不可变

RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD 里面封装计算逻辑

可分区、并行计算

RDD和IO流的关系

  • RDD的数据处理方式类似于IO流,也有装饰者设计模式
  • RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作,之前的封装全部都是功能的扩展
  • RDD是不保存数据的,但是IO可以临时保存一部分数据

2.核心属性

  1. 分区列表
    RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
  2. 分区计算函数
    Spark 在计算时,是使用分区函数对每一个分区进行计算
  3. RDD 之间的依赖关系
    RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
  4. 分区器(可选)
    当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
  5. 首选位置(可选)
    计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

3.执行原理

  • 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
  • Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
  • RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD 的工作原理:
    1)启动 Yarn 集群环境
    Spark--spark核心编程(RDD)

2)Spark 通过申请资源创建调度节点和计算节点
Spark--spark核心编程(RDD)

3)Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
Spark--spark核心编程(RDD)

4)调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
Spark--spark核心编程(RDD)

从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算

4.基础编程

4.1 RDD创建

  • 从集合(内存)中创建RDD
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)

    //TODO 创建RDD
    //从内存中创建RDD,将内存中集合的数据作为处理的数据源
    val seq=Seq[Int](1,2,3,4)
    //parallelize:并行
    //val rdd: RDD[Int] = sc.parallelize(seq)
    //makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法
    val rdd: RDD[Int] = sc.makeRDD(seq)
    rdd.collect().foreach(println)
    
    //TODO 关闭环境
    sc.stop()
  }
}
1
2
3
4
  • 从外部存储(文件)创建RDD
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)

    //TODO 创建RDD
    //从文件中创建RDD,将文件中的数据作为处理的数据源
    //path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径
    //path路径可以是文件的具体路径,也可以是目录名称
    //val rdd=sc.textFile("datas")
    //path路径还可以使用通配符
    //val rdd=sc.textFile("datas/1*.txt")
    //path还可以是分布式存储系统的路径:HDFS
    //val rdd=sc.textFile("hdfs://linux1:8020/test.txt")
    //textFile:以行为单位来读取数据,读取的数据都是字符串
    //wholeTextFiles:以文件为单位读取数据
    //val rdd = sc.wholeTextFiles("datas")
    //读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
    val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt")
    rdd.collect().foreach(println)

    //TODO 关闭环境
    sc.stop()
  }
}
Hello World
Hello Spark
hello scala
hello Spark
  • 从其他RDD创建
  • 直接创建RDD

4.2 RDD并行度与分区

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    //conf.set("spark.default.parallelism","5")  可以手动配置核数
    val sc = new SparkContext(conf)

    //TODO 创建RDD
    //RDD的并行度&分区
    //makeRDD方法可以传递第二个参数,这个参数表示分区的数量
    //第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认为)
    //源码scheduler.conf.getInt("spark.default.parallelism",totalCores)
    //spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
    //如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4), 2
    )

    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")

    //TODO 关闭环境
    sc.stop()
  }
}

Spark--spark核心编程(RDD)

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)

    //TODO 创建RDD
    //textFile可以将文件作为数据处理的数据源,默认也可以设定分区,默认分区数是2
    //minPartition:最小分区数量
    //math.min(defaultParallelism,2)
    //如果不想使用默认的分区数量,可以通过第二个参数指定分区数
    //Spark读取文件,底层其实使用的是Hadoop的读取方式
    //分区数量的计算方式:
    // totalSize=7
    // goalSize=7/2=3(byte)
    // 7/3=2...1(1.1)+1=3(分区)
    val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt",3)
    
    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
    
    //TODO 关闭环境
    sc.stop()
  }
}

1.txt

1
2
3
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)

    //TODO 创建RDD
   //TODO 数据分区的分配
    //1.数据以行为单位进行读取
    //  spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系
    //2.数据读取时以偏移量为单位,偏移量不会被重复读取
    /*字节      偏移量
    * 1@@    => 012
    * 2@@    => 345
    * 3      => 6
    */
    //3.数据分区的偏移量范围的计算
    /*分区    偏移量范围
    * 0   => [0,3]  =>12
    * 1   => [3,6]  =>3
    * 2   => [6,7]  =>
    */

    //如果数据源为多个文件,那么计算分区时以文件为单位进行分区
    val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt",2)

    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")

    //TODO 关闭环境
    sc.stop()
  }
}

Spark--spark核心编程(RDD)
Spark--spark核心编程(RDD)
Spark--spark核心编程(RDD)
Spark--spark核心编程(RDD)

4.3 RDD转换算子

Spark--spark核心编程(RDD)

1)map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-map
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    //1,2,3,4
    //2,4,6,8

    //转换函数
//    def mapFunction(num:Int):Int={
//      num*2
//    }
//    val mapRDD: RDD[Int] = rdd.map(mapFunction)

//    val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2})  //匿名函数

    val mapRDD: RDD[Int] = rdd.map(_*2)
    mapRDD.collect().foreach(println)

    sc.stop()
  }
}
2
4
6
8
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-map
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
    //1,2一个分区    3,4一个分区

    //1.rdd的计算一个分区内的数据是一个一个执行逻辑
    //  只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
    //  分区内数据的执行是有序的
    //2.不同分区数据计算是无序的
    

    val mapRDD1= rdd.map(
      x => {
      println("List:"+x)
      x
      }
    )
    val mapRDD2= mapRDD1.map(
      x => {
        println("转换后List:" + x)
        x
      })

    mapRDD2.collect()

    sc.stop()
  }
}
List:1
List:3
转换后List:1
转换后List:3
List:2
List:4
转换后List:2
转换后List:4

2)mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-mapPartitions
    //mapPartitions:可以以分区为单位进行数据转换操作
    //              但是会将整个分区的数据加载到内存进行引用
    //              如果处理完的数据是不会被释放掉,存在对象的引用
    //              在内存较小,数据量较大的场合下,容易出现内存溢出
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
    val rdd1: RDD[Int] = rdd.mapPartitions(iter => {
      println("一个分区执行一次")
      iter.map(_ * 2)
    })
    rdd1.collect().foreach(println)
    
    sc.stop()
  }
}
一个分区执行一次
一个分区执行一次
2
4
6
8
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-mapPartitions
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
    //[1,2] [3,4]
    //可以得出各分区内的最大值
    val rdd1 = rdd.mapPartitions(iter => {
      List(iter.max).iterator
    })
    rdd1.collect().foreach(println)

    sc.stop()
  }
}
2
4

map和mapPartitions的区别

  • 数据处理角度
    Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
  • 功能的角度
    Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据
  • 性能的角度
    Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用mapPartitions,使用 map 操作。

3)mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-mapPartitionsWithIndex
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
    // [1,2] [3,4]
    // [3,4]
    val rdd1: RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {
      if (index == 1) {
        iter
      } else {
        Nil.iterator  //Nil返回空集合
      }
    })

    rdd1.collect().foreach(println)
    sc.stop()
  }
}
3
4
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-mapPartitionsWithIndex
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 1,2,3,4
    // (分区号,数字)
    val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, iter) => {
      iter.map(x => (index, x))
    })

    rdd1.collect().foreach(println)
    sc.stop()
  }
}
(2,1)
(5,2)
(8,3)
(11,4)

4)flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-flatMap
    val rdd: RDD[List[Int]] = sc.makeRDD(List(
      List(1, 2), List(3,4)))
    val rdd1: RDD[Int] = rdd.flatMap(x=>x)

    rdd.collect().foreach(println)
    rdd1.collect().foreach(println)
    sc.stop()
  }
}
List(1, 2)
List(3, 4)
1
2
3
4
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-flatMap
    val rdd: RDD[String] = sc.makeRDD(List(
      "hello spark", "hello java"))
    val rdd1: RDD[String] = rdd.flatMap(x => {
      x.split(" ")
    })
    rdd1.collect().foreach(println)

    sc.stop()
  }
}
hello
spark
hello
java
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-flatMap
    //数据类型不一致时,使用模式匹配
    val rdd: RDD[Any] = sc.makeRDD(List(List(1,2),3,List(4,5)))
    val rdd1: RDD[Any] = rdd.flatMap(x => {
      x match {
        case x: List[Int] => x
        case x => List(x)
      }
    })
    rdd1.collect().foreach(println)

    sc.stop()
  }
}
1
2
3
4
5

5)glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-glom
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)

    //List => Int
    //Int => Array
    val glomRdd: RDD[Array[Int]] = rdd.glom()
    glomRdd.collect().foreach(x=>println(x.mkString(",")))
    
    sc.stop()
  }
}
1,2
3,4
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-glom
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)

    //[1,2],[3,4]   分区取最大值
    //[2]   [4]     最大值求和
    //[6]
    val glomRDD: RDD[Array[Int]] = rdd.glom()
    val maxRDD: RDD[Int] = glomRDD.map(
      array => {
        array.max
      }
    )
    println(maxRDD.collect().sum)
    
    sc.stop()
  }
}
6

6)groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-groupBy
    val rdd: RDD[String] = sc.makeRDD(List("hello","spark","hadoop","scala","java"),2)
    val groupRDD: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
    groupRDD.collect().foreach(println)

    sc.stop()
  }
}
`
```scala
(h,CompactBuffer(hello, hadoop))
(j,CompactBuffer(java))
(s,CompactBuffer(spark, scala))

1.txt

uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:02:05:03 +000
uu wr erw 17/05/2015:12:05:03 +000
uu wr erw 17/05/2015:03:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:03:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:02:05:03 +000
uu wr erw 17/05/2015:02:05:03 +000
uu wr erw 17/05/2015:12:05:03 +000
uu wr erw 17/05/2015:12:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt")
    val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(line => {
      val datas: Array[String] = line.split(" ")
      val time: String = datas(3)
      val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
      /*
      * parse()返回的是一个Date类型数据
      * parse方法可以把String型的字符串转换成特定格式的date类型,
      * 使用parse时字符串长度要和定义的SimpleDateFormat对象长度一致
      */
      val date: Date = sdf.parse(time)
      println(date)
      println("----------------")
      val sdf1 = new SimpleDateFormat("HH")
      /*
      * format返回的是一个String类型的数据
      * format方法可以把Date型字符转换成特定格式的String类型,
      * 如果Date类型和定义的SimpleDateFormat长度不一致会自动在后面补0
      */
      val hour: String = sdf1.format(date)
      println(hour)
      (hour, 1)

      // 法二:字符串截取
      // val str: String = datas(3).substring(11, 13)
      //(str, 1)
    }).groupBy(_._1)

    timeRDD.map {
      //模式匹配
      case (hour, iter) => (hour, iter.size)
    }.collect().foreach(println)
    
    sc.stop()
  }
}
Sun May 17 11:05:03 CST 2015
Sun May 17 10:05:03 CST 2015
----------------
----------------
11
10
Sun May 17 02:05:03 CST 2015
----------------
Sun May 17 11:05:03 CST 2015
----------------
02
11
Sun May 17 11:05:03 CST 2015
----------------
11
Sun May 17 02:05:03 CST 2015
----------------
02
Sun May 17 02:05:03 CST 2015
----------------
02
Sun May 17 12:05:03 CST 2015
----------------
12
Sun May 17 12:05:03 CST 2015
----------------
12
Sun May 17 03:05:03 CST 2015
----------------
03
Sun May 17 10:05:03 CST 2015
----------------
10
Sun May 17 10:05:03 CST 2015
----------------
10
Sun May 17 12:05:03 CST 2015
----------------
Sun May 17 10:05:03 CST 2015
----------------
12
10
Sun May 17 03:05:03 CST 2015
----------------
Sun May 17 10:05:03 CST 2015
----------------
03
10
Sun May 17 11:05:03 CST 2015
----------------
11
Sun May 17 10:05:03 CST 2015
----------------
10
(02,3)
(11,4)
(03,2)
(12,3)
(10,6)

7)filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-filter
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    rdd.filter(num=>(num%2!=0)).collect().foreach(println)

    sc.stop()
  }
}
1
3
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-filter
    val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt")
    //过滤10点的数据
    rdd.filter(line=>{
      line.split(" ")(3).startsWith("17/05/2015:10")
    }).collect().foreach(println)
    
    sc.stop()
  }
}
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000

8)sample

根据指定的规则从数据集中抽取数据
(应用场景:数据倾斜)

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-sample
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))

    //sample算子需要传递三个参数
    //1.第一个参数withReplacement表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
    //2.第二个参数fraction表示:如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
    //                        如果抽取放回的场合:表示数据源中的每条数据别抽取的可能次数
    //3.第三个参数seed表示,抽取数据时随机算法的种子
    //                    如果不传递第三个参数,那么使用的是当前系统时间
    println(rdd.sample(
      false,
      0.4,
      1
    ).collect().mkString(","))
    println("-------------------")
    println(rdd.sample(
      true,
      2
    ).collect().mkString(","))

    sc.stop()
  }
}
1,2,3,7,9
-------------------
2,2,2,3,3,3,3,4,4,4,5,6,7,7,7,8,8,8,9,9,9,9,10,10,10,10,10,10,10

9)distinct

将数据集中重复的数据去重

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-filter
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,2,3,2))
    println(rdd.distinct().collect() mkString (","))
    
    sc.stop()
  }
}
1,2,3,4

10)coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-coalesce
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
    val newRDD: RDD[Int] = rdd.coalesce(2)
    newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")

    sc.stop()
  }
}

Spark--spark核心编程(RDD)
Spark--spark核心编程(RDD)
coalesce方法默认情况下不会将分区的数据打乱重新组合
这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
如果想要让数据均衡,可以进行shuffle处理,第二个参数设为true
coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义的,不起作用
如果想要实现扩大分区的效果,需要使用shuffle操作

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-coalesce
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
    val newRDD: RDD[Int] = rdd.coalesce(2,true)
    newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")

    sc.stop()
  }
}

Spark--spark核心编程(RDD)
Spark--spark核心编程(RDD)

11)repartition

该操作内部其实执行的是 coalesce 操作,底层是coalesce函数,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的RDD,还是将分区数少的 RDD 转换为分区数多的RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-repartition
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
    val newRDD: RDD[Int] = rdd.repartition(3)
    newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
    
    sc.stop()
  }
}

Spark--spark核心编程(RDD)
Spark--spark核心编程(RDD)
Spark--spark核心编程(RDD)

12)sortBy

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原RDD 的分区数一致。中间存在 shuffle 的过程

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-sortBy
    //sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认是升序
    //第二个参数可以改变排序的方式,用false
    //sortBy默认情况下,不会改变分区,但是中间存在shuffle操作
    val rdd: RDD[Int] = sc.makeRDD(List(4,3,2,6,4,1))
    println(rdd.sortBy(x => x,false).collect().mkString(","))
    println("---------------")
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("1",1),("11",2),("2",3)))
    println(rdd1.sortBy(x => x._1).collect().mkString(","))
    println("---------------")
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("1",1),("11",2),("2",3)))
    println(rdd2.sortBy(x => x._1.toInt).collect().mkString(","))
    
    sc.stop()
  }
}
6,4,4,3,2,1
---------------
(1,1),(11,2),(2,3)
---------------
(1,1),(2,3),(11,2)

13)双Value类型{ intersection , union , subtract , zip }

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子-双Value类型
    //交集,并集和差集要求两个数据源数据类型保持一致
    //拉链操作两个数据源的类型可以不一致
    //拉链操作时,两个数据源要求分区数量要保持一致,分区中数据数量保持一致
    val rdd1: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    val rdd2: RDD[Int] = sc.makeRDD(List(3,4,5,6))

    //交集
    println(rdd1.intersection(rdd2).collect().mkString(","))
    //并集
    println(rdd1.union(rdd2).collect().mkString(","))
    //差集
    println(rdd1.subtract(rdd2).collect().mkString(","))
    //拉链
    println(rdd1.zip(rdd2).collect().mkString(","))
    
    sc.stop()
  }
}
3,4
1,2,3,4,3,4,5,6
1,2
(1,3),(2,4),(3,5),(4,6)

14)Key-Value类型{ partitionBy , reduceByKey , groupByKey , aggregateByKey , foldByKey , combineByKey }

partitionBy
将数据按照指定Partitioner 重新进行分区。Spark 默认的分区器是HashPartitioner

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)
    
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5),2)
     val mapRDD: RDD[(Int, Int)] = rdd.map((_,1))
    //partitionBy根据指定的分区规则对数据进行重分区
    mapRDD.partitionBy(new HashPartitioner(2))
      .saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")

    sc.stop()

Spark--spark核心编程(RDD)Spark--spark核心编程(RDD)
reduceByKey

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- Key-Value类型
    //reduceByKey:相同的key的数据进行value数据的聚合操作
    //reduceByKey分区内和分区间计算规则是相同的
    //Scala语言中一般的聚合操作都是两两聚合,spark基于Scala开发的,所以它的聚合也是两两聚合
    //reduceByKey中如果key的数据只有一个,是不会参与运算的
    val rdd: RDD[(String, Int)] =sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
    val rdd1: RDD[(String, Int)] = rdd.reduceByKey((x,y)=>{
      println(s"x=${x},y=${y}")
      x+y
    })
    rdd1.collect().foreach(println)
    
    sc.stop()
  }
}
x=1,y=2
x=3,y=3
(a,6)
(b,4)

groupByKey

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- Key-Value类型
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))

    //groupByKey: 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
    //             元组中的第一个元素就是key
    //             元组中的第二个元素就是相同key的value的
    val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    rdd1.collect().foreach(println)
    println("----------------------")
    val rdd2: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
    rdd2.collect().foreach(println)
    
    sc.stop()
  }
}
(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))
----------------------
(a,CompactBuffer((a,1), (a,2), (a,3)))
(b,CompactBuffer((b,4)))

groupByKey和reduceByKey的区别

  • 从 shuffle 的角度:
    reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
  • 从功能的角度:
    reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey

aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- Key-Value类型
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",6),("b",4),("a",6),("b",2)),2)
    //取出每个分区内相同 key 的最大值然后分区间相加

    //aggregateByKey存在函数柯里化,有两个参数列表
    //第一个参数列表,需要传递一个参数,表示为初始值
    //             主要用于当碰见第一个key的时候,和value进行分区内计算
    //第二个参数列表需要传递两个参数:
    //                            第一个参数表示分区内计算规则
    //                            第二个参数表示分区间计算规则
    val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(5)(
      (x, y) => math.max(x, y),
      (x, y) => x + y
    )
    rdd1.collect().foreach(println)

    sc.stop()
  }
}
(b,11)
(a,11)

foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为foldByKey

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- Key-Value类型
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2)
    val rdd1: RDD[(String, Int)] = rdd.foldByKey(0)(_+_)
    rdd1.collect().foreach(println)

    sc.stop()
  }
}
(b,6)
(a,9)

combineByKey

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- Key-Value类型
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2)
    //combineByKey:方法需要三个参数
    //第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
    //第二个参数表示:分区内的计算规则
    //第三个参数表示:分区间的计算规则
    val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey(
      v => (v, 1),
      (t: (Int, Int), v) => {
        (t._1 + v, t._2 + 1)
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    rdd1.collect().foreach(println)
    println("---------------------")
    val rdd2: RDD[(String, Int)] = rdd1.mapValues {
      case (num, cnt) => {
        num / cnt
      }
    }
    rdd2.collect().foreach(println)

    sc.stop()
  }
}
(b,(6,2))
(a,(9,3))
---------------------
(b,3)
(a,3)

reduceByKey、foldByKey、aggregateByKey、combineByKey的区别

  • reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  • AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
  • CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- Key-Value类型
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2)

    //wordcount
    println(rdd.reduceByKey(_ + _).collect().mkString(","))
    println(rdd.aggregateByKey(0)(_+_,_+_).collect().mkString(","))
    println(rdd.foldByKey(0)(_+_).collect().mkString(","))
    println(rdd.combineByKey(v=>v,(x:Int,y)=>(x+y),(x:Int,y:Int)=>(x+y)).collect().mkString(","))
    
    sc.stop()
  }
}

15)join

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- join
    //join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
    //如果两个数据源中key没有匹配上,那么数据不会出现在结果中
    //如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2)))
    val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5),("d",6),("a",2)))
    val rdd2: RDD[(String, (Int, Any))] = rdd.join(rdd1)
    rdd2.collect().foreach(println)

    sc.stop()
  }
}
(a,(1,g))
(a,(1,2))
(b,(4,5))

16)leftOuterJoin和rightOuterJoin

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- 左右连接
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2)))
    val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5)))
    val rdd2: RDD[(String, (Int, Option[Any]))] = rdd.leftOuterJoin(rdd1)
    val rdd3: RDD[(String, (Option[Int], Any))] = rdd.rightOuterJoin(rdd1)

    rdd2.collect().foreach(println)
    println("---------------------")
    rdd3.collect().foreach(println)

    sc.stop()
  }
}
(a,(1,Some(g)))
(b,(4,Some(5)))
(c,(2,None))
---------------------
(a,(Some(1),g))
(b,(Some(4),5))

17)cogroup

object Spark_rdd_01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //TODO 算子- cogroup  
    //cogroup-connection+group
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2)))
    val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5),("b","m")))
    val rdd2: RDD[(String, (Iterable[Int], Iterable[Any]))] = rdd.cogroup(rdd1)
    rdd2.collect().foreach(println)
    
    sc.stop()
  }
}
(a,(CompactBuffer(1),CompactBuffer(g)))
(b,(CompactBuffer(4),CompactBuffer(5, m)))
(c,(CompactBuffer(2),CompactBuffer()))
上一篇:Spark源码解析(七)Action算子解析


下一篇:Java8 Stream对两个 List 遍历匹配数据的优化处理操作