RDD
- RDD
- 1.什么是RDD
- 2.核心属性
- 3.执行原理
- 4.基础编程
- 4.1 RDD创建
- 4.2 RDD并行度与分区
- 4.3 RDD转换算子
- 1)map
- 2)mapPartitions
- 3)mapPartitionsWithIndex
- 4)flatMap
- 5)glom
- 6)groupBy
- 7)filter
- 8)sample
- 9)distinct
- 10)coalesce
- 11)repartition
- 12)sortBy
- 13)双Value类型{ intersection , union , subtract , zip }
- 14)Key-Value类型{ partitionBy , reduceByKey , groupByKey , aggregateByKey , foldByKey , combineByKey }
- 15)join
- 16)leftOuterJoin和rightOuterJoin
- 17)cogroup
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
- RDD : 弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
RDD
1.什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
分布式
数据存储在大数据集群不同节点上
数据集
RDD 封装了计算逻辑,并不保存数据
数据抽象
RDD 是一个抽象类,需要子类具体实现
不可变
RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD 里面封装计算逻辑
可分区、并行计算
RDD和IO流的关系
- RDD的数据处理方式类似于IO流,也有装饰者设计模式
- RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作,之前的封装全部都是功能的扩展
- RDD是不保存数据的,但是IO可以临时保存一部分数据
2.核心属性
- 分区列表
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。 - 分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算 - RDD 之间的依赖关系
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系 - 分区器(可选)
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区 - 首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
3.执行原理
- 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
- Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
- RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD 的工作原理:
1)启动 Yarn 集群环境
2)Spark 通过申请资源创建调度节点和计算节点
3)Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
4)调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算
4.基础编程
4.1 RDD创建
- 从集合(内存)中创建RDD
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
//TODO 创建RDD
//从内存中创建RDD,将内存中集合的数据作为处理的数据源
val seq=Seq[Int](1,2,3,4)
//parallelize:并行
//val rdd: RDD[Int] = sc.parallelize(seq)
//makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法
val rdd: RDD[Int] = sc.makeRDD(seq)
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
1
2
3
4
- 从外部存储(文件)创建RDD
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
//TODO 创建RDD
//从文件中创建RDD,将文件中的数据作为处理的数据源
//path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径
//path路径可以是文件的具体路径,也可以是目录名称
//val rdd=sc.textFile("datas")
//path路径还可以使用通配符
//val rdd=sc.textFile("datas/1*.txt")
//path还可以是分布式存储系统的路径:HDFS
//val rdd=sc.textFile("hdfs://linux1:8020/test.txt")
//textFile:以行为单位来读取数据,读取的数据都是字符串
//wholeTextFiles:以文件为单位读取数据
//val rdd = sc.wholeTextFiles("datas")
//读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt")
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
Hello World
Hello Spark
hello scala
hello Spark
- 从其他RDD创建
- 直接创建RDD
4.2 RDD并行度与分区
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
//conf.set("spark.default.parallelism","5") 可以手动配置核数
val sc = new SparkContext(conf)
//TODO 创建RDD
//RDD的并行度&分区
//makeRDD方法可以传递第二个参数,这个参数表示分区的数量
//第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认为)
//源码scheduler.conf.getInt("spark.default.parallelism",totalCores)
//spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
//如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 3, 4), 2
)
//将处理的数据保存成分区文件
rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
//TODO 关闭环境
sc.stop()
}
}
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
//TODO 创建RDD
//textFile可以将文件作为数据处理的数据源,默认也可以设定分区,默认分区数是2
//minPartition:最小分区数量
//math.min(defaultParallelism,2)
//如果不想使用默认的分区数量,可以通过第二个参数指定分区数
//Spark读取文件,底层其实使用的是Hadoop的读取方式
//分区数量的计算方式:
// totalSize=7
// goalSize=7/2=3(byte)
// 7/3=2...1(1.1)+1=3(分区)
val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt",3)
//将处理的数据保存成分区文件
rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
//TODO 关闭环境
sc.stop()
}
}
1.txt
1
2
3
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
//TODO 创建RDD
//TODO 数据分区的分配
//1.数据以行为单位进行读取
// spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系
//2.数据读取时以偏移量为单位,偏移量不会被重复读取
/*字节 偏移量
* 1@@ => 012
* 2@@ => 345
* 3 => 6
*/
//3.数据分区的偏移量范围的计算
/*分区 偏移量范围
* 0 => [0,3] =>12
* 1 => [3,6] =>3
* 2 => [6,7] =>
*/
//如果数据源为多个文件,那么计算分区时以文件为单位进行分区
val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt",2)
//将处理的数据保存成分区文件
rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
//TODO 关闭环境
sc.stop()
}
}
4.3 RDD转换算子
1)map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-map
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//1,2,3,4
//2,4,6,8
//转换函数
// def mapFunction(num:Int):Int={
// num*2
// }
// val mapRDD: RDD[Int] = rdd.map(mapFunction)
// val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2}) //匿名函数
val mapRDD: RDD[Int] = rdd.map(_*2)
mapRDD.collect().foreach(println)
sc.stop()
}
}
2
4
6
8
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-map
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//1,2一个分区 3,4一个分区
//1.rdd的计算一个分区内的数据是一个一个执行逻辑
// 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
// 分区内数据的执行是有序的
//2.不同分区数据计算是无序的
val mapRDD1= rdd.map(
x => {
println("List:"+x)
x
}
)
val mapRDD2= mapRDD1.map(
x => {
println("转换后List:" + x)
x
})
mapRDD2.collect()
sc.stop()
}
}
List:1
List:3
转换后List:1
转换后List:3
List:2
List:4
转换后List:2
转换后List:4
2)mapPartitions
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-mapPartitions
//mapPartitions:可以以分区为单位进行数据转换操作
// 但是会将整个分区的数据加载到内存进行引用
// 如果处理完的数据是不会被释放掉,存在对象的引用
// 在内存较小,数据量较大的场合下,容易出现内存溢出
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
val rdd1: RDD[Int] = rdd.mapPartitions(iter => {
println("一个分区执行一次")
iter.map(_ * 2)
})
rdd1.collect().foreach(println)
sc.stop()
}
}
一个分区执行一次
一个分区执行一次
2
4
6
8
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-mapPartitions
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//[1,2] [3,4]
//可以得出各分区内的最大值
val rdd1 = rdd.mapPartitions(iter => {
List(iter.max).iterator
})
rdd1.collect().foreach(println)
sc.stop()
}
}
2
4
map和mapPartitions的区别
- 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。 - 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据 - 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用mapPartitions,使用 map 操作。
3)mapPartitionsWithIndex
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-mapPartitionsWithIndex
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
// [1,2] [3,4]
// [3,4]
val rdd1: RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {
if (index == 1) {
iter
} else {
Nil.iterator //Nil返回空集合
}
})
rdd1.collect().foreach(println)
sc.stop()
}
}
3
4
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-mapPartitionsWithIndex
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 1,2,3,4
// (分区号,数字)
val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x))
})
rdd1.collect().foreach(println)
sc.stop()
}
}
(2,1)
(5,2)
(8,3)
(11,4)
4)flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-flatMap
val rdd: RDD[List[Int]] = sc.makeRDD(List(
List(1, 2), List(3,4)))
val rdd1: RDD[Int] = rdd.flatMap(x=>x)
rdd.collect().foreach(println)
rdd1.collect().foreach(println)
sc.stop()
}
}
List(1, 2)
List(3, 4)
1
2
3
4
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-flatMap
val rdd: RDD[String] = sc.makeRDD(List(
"hello spark", "hello java"))
val rdd1: RDD[String] = rdd.flatMap(x => {
x.split(" ")
})
rdd1.collect().foreach(println)
sc.stop()
}
}
hello
spark
hello
java
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-flatMap
//数据类型不一致时,使用模式匹配
val rdd: RDD[Any] = sc.makeRDD(List(List(1,2),3,List(4,5)))
val rdd1: RDD[Any] = rdd.flatMap(x => {
x match {
case x: List[Int] => x
case x => List(x)
}
})
rdd1.collect().foreach(println)
sc.stop()
}
}
1
2
3
4
5
5)glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-glom
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//List => Int
//Int => Array
val glomRdd: RDD[Array[Int]] = rdd.glom()
glomRdd.collect().foreach(x=>println(x.mkString(",")))
sc.stop()
}
}
1,2
3,4
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-glom
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//[1,2],[3,4] 分区取最大值
//[2] [4] 最大值求和
//[6]
val glomRDD: RDD[Array[Int]] = rdd.glom()
val maxRDD: RDD[Int] = glomRDD.map(
array => {
array.max
}
)
println(maxRDD.collect().sum)
sc.stop()
}
}
6
6)groupBy
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-groupBy
val rdd: RDD[String] = sc.makeRDD(List("hello","spark","hadoop","scala","java"),2)
val groupRDD: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
sc.stop()
}
}
`
```scala
(h,CompactBuffer(hello, hadoop))
(j,CompactBuffer(java))
(s,CompactBuffer(spark, scala))
1.txt
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:02:05:03 +000
uu wr erw 17/05/2015:12:05:03 +000
uu wr erw 17/05/2015:03:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:03:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:11:05:03 +000
uu wr erw 17/05/2015:02:05:03 +000
uu wr erw 17/05/2015:02:05:03 +000
uu wr erw 17/05/2015:12:05:03 +000
uu wr erw 17/05/2015:12:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt")
val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(line => {
val datas: Array[String] = line.split(" ")
val time: String = datas(3)
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
/*
* parse()返回的是一个Date类型数据
* parse方法可以把String型的字符串转换成特定格式的date类型,
* 使用parse时字符串长度要和定义的SimpleDateFormat对象长度一致
*/
val date: Date = sdf.parse(time)
println(date)
println("----------------")
val sdf1 = new SimpleDateFormat("HH")
/*
* format返回的是一个String类型的数据
* format方法可以把Date型字符转换成特定格式的String类型,
* 如果Date类型和定义的SimpleDateFormat长度不一致会自动在后面补0
*/
val hour: String = sdf1.format(date)
println(hour)
(hour, 1)
// 法二:字符串截取
// val str: String = datas(3).substring(11, 13)
//(str, 1)
}).groupBy(_._1)
timeRDD.map {
//模式匹配
case (hour, iter) => (hour, iter.size)
}.collect().foreach(println)
sc.stop()
}
}
Sun May 17 11:05:03 CST 2015
Sun May 17 10:05:03 CST 2015
----------------
----------------
11
10
Sun May 17 02:05:03 CST 2015
----------------
Sun May 17 11:05:03 CST 2015
----------------
02
11
Sun May 17 11:05:03 CST 2015
----------------
11
Sun May 17 02:05:03 CST 2015
----------------
02
Sun May 17 02:05:03 CST 2015
----------------
02
Sun May 17 12:05:03 CST 2015
----------------
12
Sun May 17 12:05:03 CST 2015
----------------
12
Sun May 17 03:05:03 CST 2015
----------------
03
Sun May 17 10:05:03 CST 2015
----------------
10
Sun May 17 10:05:03 CST 2015
----------------
10
Sun May 17 12:05:03 CST 2015
----------------
Sun May 17 10:05:03 CST 2015
----------------
12
10
Sun May 17 03:05:03 CST 2015
----------------
Sun May 17 10:05:03 CST 2015
----------------
03
10
Sun May 17 11:05:03 CST 2015
----------------
11
Sun May 17 10:05:03 CST 2015
----------------
10
(02,3)
(11,4)
(03,2)
(12,3)
(10,6)
7)filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-filter
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
rdd.filter(num=>(num%2!=0)).collect().foreach(println)
sc.stop()
}
}
1
3
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-filter
val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt")
//过滤10点的数据
rdd.filter(line=>{
line.split(" ")(3).startsWith("17/05/2015:10")
}).collect().foreach(println)
sc.stop()
}
}
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
uu wr erw 17/05/2015:10:05:03 +000
8)sample
根据指定的规则从数据集中抽取数据
(应用场景:数据倾斜)
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-sample
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
//sample算子需要传递三个参数
//1.第一个参数withReplacement表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
//2.第二个参数fraction表示:如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
// 如果抽取放回的场合:表示数据源中的每条数据别抽取的可能次数
//3.第三个参数seed表示,抽取数据时随机算法的种子
// 如果不传递第三个参数,那么使用的是当前系统时间
println(rdd.sample(
false,
0.4,
1
).collect().mkString(","))
println("-------------------")
println(rdd.sample(
true,
2
).collect().mkString(","))
sc.stop()
}
}
1,2,3,7,9
-------------------
2,2,2,3,3,3,3,4,4,4,5,6,7,7,7,8,8,8,9,9,9,9,10,10,10,10,10,10,10
9)distinct
将数据集中重复的数据去重
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-filter
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,2,3,2))
println(rdd.distinct().collect() mkString (","))
sc.stop()
}
}
1,2,3,4
10)coalesce
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-coalesce
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
val newRDD: RDD[Int] = rdd.coalesce(2)
newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
sc.stop()
}
}
coalesce方法默认情况下不会将分区的数据打乱重新组合
这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
如果想要让数据均衡,可以进行shuffle处理,第二个参数设为true
coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义的,不起作用
如果想要实现扩大分区的效果,需要使用shuffle操作
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-coalesce
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
val newRDD: RDD[Int] = rdd.coalesce(2,true)
newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
sc.stop()
}
}
11)repartition
该操作内部其实执行的是 coalesce 操作,底层是coalesce函数,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的RDD,还是将分区数少的 RDD 转换为分区数多的RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-repartition
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
val newRDD: RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
sc.stop()
}
}
12)sortBy
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原RDD 的分区数一致。中间存在 shuffle 的过程
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-sortBy
//sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认是升序
//第二个参数可以改变排序的方式,用false
//sortBy默认情况下,不会改变分区,但是中间存在shuffle操作
val rdd: RDD[Int] = sc.makeRDD(List(4,3,2,6,4,1))
println(rdd.sortBy(x => x,false).collect().mkString(","))
println("---------------")
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("1",1),("11",2),("2",3)))
println(rdd1.sortBy(x => x._1).collect().mkString(","))
println("---------------")
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("1",1),("11",2),("2",3)))
println(rdd2.sortBy(x => x._1.toInt).collect().mkString(","))
sc.stop()
}
}
6,4,4,3,2,1
---------------
(1,1),(11,2),(2,3)
---------------
(1,1),(2,3),(11,2)
13)双Value类型{ intersection , union , subtract , zip }
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子-双Value类型
//交集,并集和差集要求两个数据源数据类型保持一致
//拉链操作两个数据源的类型可以不一致
//拉链操作时,两个数据源要求分区数量要保持一致,分区中数据数量保持一致
val rdd1: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val rdd2: RDD[Int] = sc.makeRDD(List(3,4,5,6))
//交集
println(rdd1.intersection(rdd2).collect().mkString(","))
//并集
println(rdd1.union(rdd2).collect().mkString(","))
//差集
println(rdd1.subtract(rdd2).collect().mkString(","))
//拉链
println(rdd1.zip(rdd2).collect().mkString(","))
sc.stop()
}
}
3,4
1,2,3,4,3,4,5,6
1,2
(1,3),(2,4),(3,5),(4,6)
14)Key-Value类型{ partitionBy , reduceByKey , groupByKey , aggregateByKey , foldByKey , combineByKey }
partitionBy
将数据按照指定Partitioner 重新进行分区。Spark 默认的分区器是HashPartitioner
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5),2)
val mapRDD: RDD[(Int, Int)] = rdd.map((_,1))
//partitionBy根据指定的分区规则对数据进行重分区
mapRDD.partitionBy(new HashPartitioner(2))
.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output")
sc.stop()
reduceByKey
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- Key-Value类型
//reduceByKey:相同的key的数据进行value数据的聚合操作
//reduceByKey分区内和分区间计算规则是相同的
//Scala语言中一般的聚合操作都是两两聚合,spark基于Scala开发的,所以它的聚合也是两两聚合
//reduceByKey中如果key的数据只有一个,是不会参与运算的
val rdd: RDD[(String, Int)] =sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
val rdd1: RDD[(String, Int)] = rdd.reduceByKey((x,y)=>{
println(s"x=${x},y=${y}")
x+y
})
rdd1.collect().foreach(println)
sc.stop()
}
}
x=1,y=2
x=3,y=3
(a,6)
(b,4)
groupByKey
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- Key-Value类型
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
//groupByKey: 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
// 元组中的第一个元素就是key
// 元组中的第二个元素就是相同key的value的
val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
rdd1.collect().foreach(println)
println("----------------------")
val rdd2: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
rdd2.collect().foreach(println)
sc.stop()
}
}
(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))
----------------------
(a,CompactBuffer((a,1), (a,2), (a,3)))
(b,CompactBuffer((b,4)))
groupByKey和reduceByKey的区别
- 从 shuffle 的角度:
reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。 - 从功能的角度:
reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- Key-Value类型
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",6),("b",4),("a",6),("b",2)),2)
//取出每个分区内相同 key 的最大值然后分区间相加
//aggregateByKey存在函数柯里化,有两个参数列表
//第一个参数列表,需要传递一个参数,表示为初始值
// 主要用于当碰见第一个key的时候,和value进行分区内计算
//第二个参数列表需要传递两个参数:
// 第一个参数表示分区内计算规则
// 第二个参数表示分区间计算规则
val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(5)(
(x, y) => math.max(x, y),
(x, y) => x + y
)
rdd1.collect().foreach(println)
sc.stop()
}
}
(b,11)
(a,11)
foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为foldByKey
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- Key-Value类型
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2)
val rdd1: RDD[(String, Int)] = rdd.foldByKey(0)(_+_)
rdd1.collect().foreach(println)
sc.stop()
}
}
(b,6)
(a,9)
combineByKey
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- Key-Value类型
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2)
//combineByKey:方法需要三个参数
//第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
//第二个参数表示:分区内的计算规则
//第三个参数表示:分区间的计算规则
val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(t: (Int, Int), v) => {
(t._1 + v, t._2 + 1)
},
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
rdd1.collect().foreach(println)
println("---------------------")
val rdd2: RDD[(String, Int)] = rdd1.mapValues {
case (num, cnt) => {
num / cnt
}
}
rdd2.collect().foreach(println)
sc.stop()
}
}
(b,(6,2))
(a,(9,3))
---------------------
(b,3)
(a,3)
reduceByKey、foldByKey、aggregateByKey、combineByKey的区别
- reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
- FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
- AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
- CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- Key-Value类型
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2)
//wordcount
println(rdd.reduceByKey(_ + _).collect().mkString(","))
println(rdd.aggregateByKey(0)(_+_,_+_).collect().mkString(","))
println(rdd.foldByKey(0)(_+_).collect().mkString(","))
println(rdd.combineByKey(v=>v,(x:Int,y)=>(x+y),(x:Int,y:Int)=>(x+y)).collect().mkString(","))
sc.stop()
}
}
15)join
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- join
//join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
//如果两个数据源中key没有匹配上,那么数据不会出现在结果中
//如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2)))
val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5),("d",6),("a",2)))
val rdd2: RDD[(String, (Int, Any))] = rdd.join(rdd1)
rdd2.collect().foreach(println)
sc.stop()
}
}
(a,(1,g))
(a,(1,2))
(b,(4,5))
16)leftOuterJoin和rightOuterJoin
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- 左右连接
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2)))
val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5)))
val rdd2: RDD[(String, (Int, Option[Any]))] = rdd.leftOuterJoin(rdd1)
val rdd3: RDD[(String, (Option[Int], Any))] = rdd.rightOuterJoin(rdd1)
rdd2.collect().foreach(println)
println("---------------------")
rdd3.collect().foreach(println)
sc.stop()
}
}
(a,(1,Some(g)))
(b,(4,Some(5)))
(c,(2,None))
---------------------
(a,(Some(1),g))
(b,(Some(4),5))
17)cogroup
object Spark_rdd_01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 算子- cogroup
//cogroup-connection+group
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2)))
val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5),("b","m")))
val rdd2: RDD[(String, (Iterable[Int], Iterable[Any]))] = rdd.cogroup(rdd1)
rdd2.collect().foreach(println)
sc.stop()
}
}
(a,(CompactBuffer(1),CompactBuffer(g)))
(b,(CompactBuffer(4),CompactBuffer(5, m)))
(c,(CompactBuffer(2),CompactBuffer()))