SparkCore-常用转换算子总结

主要是分为三个类型:Value 类型、双 Value 类型和 Key-Value 类型

1.Value类型

1.1map

SparkCore-常用转换算子总结

 传递一个对象,返回一个对象

源码中给的解释机翻如下:

通过对这个RDD的所有元素应用一个函数,返回一个新的RDD。

说人话就是:

将处理的数据逐条进行映射转换,可以是类型的转换,也可以是值的转换

值的转换,即里面每个数据*2

val mapRDD: RDD[Int] = rdd.map(
   _ * 2
)

类型转换,转为k-v结构

val wordToOne: RDD[(String, Int)] = words.map(
   word => (word, 1)
)

1.1.1mapPartitions

SparkCore-常用转换算子总结

传递一个迭代器,返回一个迭代器

源码中给的解释机翻如下:

通过对这个RDD的每个分区应用一个函数,返回一个新的RDD。

说人话就是:

将待处理的数据以分区为单位发送到计算节点进行处理,可以是值的转换,可以是类型的转换,也可以过滤数据

 一次是处理一个分区的数据,所以说我们发现下面的代码中println只执行两次。

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val mapRDD: RDD[Int] = rdd.mapPartitions(
      iter => {
        //一次把一个分区的数据拿过来,所以只执行两次
        println(">>>>>>>>>>>>>>")
        iter.map(_ * 2)
      }
    )

1.1.2map与mapPartitions的区别

数据处理上:map是分区内数据逐个执行,类似于串行操作;而mapPartitions是以分区为单位进行批处理操作。

功能上:map对于源数据进行值改变或者类型转换,但不能减少或者增加数据;mapPartitions传递一个迭代器返回一个迭代器,可以对里面的元素进行过滤,即减少和增加数据。

性能上:map类似于串行操作,所以性能较低;mapPartitions算子类似于批处理,性能较高,但是mapPartitions会长时间占用内存。

1.1.3mapPartitionsWithIndex

SparkCore-常用转换算子总结

源码中给的解释机翻如下:

通过对这个RDD的每个分区应用一个函数,返回一个新的RDD,同时跟踪原始分区的索引。

说人话就是:

其实就在mapPartitions的基础上多了一个可以返回分区的索引的功能,不多赘述。

1.2flatMap

SparkCore-常用转换算子总结

传递一个对象,返回一个“序列”

源码中给的解释机翻如下:

返回一个新的RDD,首先对这个RDD的所有元素应用一个函数,然后将结果扁平化。

说人话就是:

在map的基础上做了扁平化处理

下面这张网上找的图可以很形象的说明flatMap:

SparkCore-常用转换算子总结

 就是先map再flatten

1.3glom

SparkCore-常用转换算子总结

源码中的解释机翻如下:

返回一个RDD,将每个分区中的所有元素合并到一个数组中。

说人话就是:

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不会改变。

val rdd: RDD[Any] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val glomRDD: RDD[Array[Any]] = rdd.glom()

    glomRDD.collect().foreach(data => {
      println(data.mkString(","))
    })

有两个分区,那么就被转换为数组1,2和数组3,4。

1.4groupBy

SparkCore-常用转换算子总结

源码中的解释机翻如下:

返回一个分组项目的RDD。每个组由一个键和一个映射到该键的元素序列组成。每个组中元素的顺序无法保证,甚至可能在每次评估结果RDD时都有所不同。

说人话就是:

将数据根据指定的规则进行分组分区默认不变,但是数据会被打乱重组(shuffle)。

按奇偶分区

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)


val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)

结果:

SparkCore-常用转换算子总结

很明显这里是传入列表中的元素,返回由运算结果为key,映射到该key的元素组成的序列为value的结果。 

1.5filter

SparkCore-常用转换算子总结

传递的是一个对象,返回的是一个布尔类型

源码中的解释机翻如下:

返回一个新的RDD,其中只包含满足谓词的元素。

说人话就是:

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。

进行筛选后,分区不变,但各个分区的数据可能不均衡,就会出现数据倾斜

筛选掉所有偶数,保留奇数。

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

val filterRDD: RDD[Int] = rdd.filter(_ % 2 != 0)

1.6sample

SparkCore-常用转换算子总结

源码中的解释机翻如下:

返回这个RDD的抽样子集。

说人话就是:

根据指定的规则从数据集中抽取数据。

我个人觉得这个了解即可,没必要特意去记,这里就举例分析这个转换算子了。

1.7distinct

SparkCore-常用转换算子总结

源码中的解释机翻如下:

返回一个新的RDD,其中包含这个RDD中不同的元素

说人话就是:

去重,比如对下面的列表进行去重

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))

val rdd1: RDD[Int] = rdd.distinct()

1.8coalesce

SparkCore-常用转换算子总结

源码中的解释机翻如下:

返回一个新的RDD,它被简化为' numPartitions '分区。

说人话就是:

根据数据量缩小分区。比如当spark程序存在过多的小任务时,可以通过coalesce方法收缩合并分区,减少分区的个数,减少任务调度成本

有可能缩小分区的时候导致数据分配不均匀,可以通过启用shuffle使数据均匀。

SparkCore-常用转换算子总结

1.9repartition

SparkCore-常用转换算子总结

源码中的解释机翻如下:

返回一个具有numPartitions分区的新RDD。

说人话就是:

可以扩大也可以缩小分区,但是其实其底层就是:

SparkCore-常用转换算子总结

 也就是说上面提到的 coalesce方法如果启用shuffle是可以扩大分区的。

1.10sortBy

SparkCore-常用转换算子总结

源码中的解释机翻如下:

返回按给定键函数排序的RDD。

说人话就是:

用于排序数据,使用比较简单。其中ascending默认为true表示升序排列,指定为false可以改为降序。

2.双value类型

2.1intersection

求两个集合的交集

2.2union

求两个集合的并集

2.3subtract

求差集,也就是说以一个RDD的元素为主,去除两个RDD中重复的元素,其他保留下来。

比如(1,2,3,4)和(3,4,5,6),最后只保留1和2。

2.4zip

俗称拉链,将两个RDD中的元素,以键值对的形式进行合并。其中key和value分别为第一个、第二个RDD中的相同位置的元素。

3.key-value类型

3.1partitionBy

SparkCore-常用转换算子总结

源码中的解释机翻如下: 

返回使用指定分区程序分区的RDD的副本

说人话:

将数据按照指定的分区器Partitioner重新进行分区。默认的分区器是HashPartitioner。

3.2reduceByKey

SparkCore-常用转换算子总结

 源码中的解释机翻如下:

使用关联和交换reduce函数合并每个键的值。这也会在将结果发送给reducer之前,在每个mapper上执行本地合并,类似于MapReduce中的“combiner”。输出将使用现有分区并行级别进行散列分区。

说人话:

将数据按照相同的k对v进行聚合,至于翻译后半段在groupByKey后面再解释。典型应用就是wordCount。

3.3groupByKey 

SparkCore-常用转换算子总结

 源码中的解释机翻如下:

将RDD中每个键的值分组到单个序列中。使用现有的分区并行级别对产生的RDD进行散列分区。每个组中元素的顺序无法保证,甚至可能在每次评估结果RDD时都有所不同。

说人话:

将数据根据k对v进行分组。我们发现和reduceByKey差不多,不过groupByKey只包含分组的功能,而reduceByKey包含分组和聚合的功能。

这里提到reduceByKey说到的机翻:reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合处理,即combine,这样可以减少落盘的数据量,所以性能较高,而groupByKey只是进行分组,不存在数据量减少的问题。

3.4aggregateByKey

SparkCore-常用转换算子总结

源码中的解释机翻如下:

使用给定的组合函数和中性的“零值”聚合每个键的值。这个函数可以返回一个不同的结果类型U,而不是这个RDD中值的类型V。因此,我们需要一个操作将V合并为U,一个操作将两个U合并,如scala.TraversableOnce。前一个操作用于合并分区内的值,后一个操作用于合并分区之间的值。为了避免内存分配,这两个函数都允许修改并返回它们的第一个参数,而不是创建一个新的U。

说人话:

将数据根据不同的规则进行分区内计算和分区间计算

比如下面的先是分区内求最大,然后分区间求和。

//第一个参数需要传递一个参数表示初始值,主要用于当碰见第一个key的时候和value进行分区计算
    //第二个参数中的第一个参数是分区内计算规则(选最大),第二个参数是分区间计算规则(相加聚合)
    val abkRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)

3.5foldByKey

SparkCore-常用转换算子总结

 源码中的解释机翻如下:

使用一个关联函数和一个中立的“零值”来合并每个键的值,它可以被添加到结果中任意次数,并且不能改变结果(例如,Nil用于列表拼接,0用于加法,或1用于乘法)。

说人话:

相当于分区内分区间计算规则相同版的aggregateByKey。

3.6combineByKey

SparkCore-常用转换算子总结

 源码中的解释机翻如下:

简化版的combineByKeyWithClassTag,哈希分区的结果RDD使用现有的分区并行级别。这里的方法是为了向后兼容。它不向洗牌提供组合器类信息。

说人话:

是对k-v类型的rdd进行聚合操作的聚集函数,其实和aggregateByKey挺像,但是它允许用户返回值的类型与输入的不同

如下是求相同key的v的总和与数量,当然得到这个总和与数量,后面就可以求某个key对应值的平均值了。

val abkRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
      v => (v, 1),
      //t和默认值类型一样,v是指k相同的待聚合的value
      (t: (Int, Int), v) => {
        (t._1 + v, t._2 + 1)
      },
      //t1和t2都和默认值类型一样
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )

3.6.1reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别

第一个数据 分区内与分区间计算规则
reduceByKey 不进行任何计算 相同
foldByKey 与初始值进行分区内计算 相同
aggregateByKey 与初始值进行分区内计算 不同
combineByKey 当数据结构不满足要求时,可以让第一个数据转换结构 不同

3.7sortByKey

SparkCore-常用转换算子总结

 源码中的解释机翻如下:

按键对RDD排序,这样每个分区都包含一个排序过的元素范围。在生成的RDD上调用' collect '或' save '将返回或输出一个有序的记录列表(在' save '的情况下,它们将按照键的顺序写入文件系统中的多个' part-X '文件)。

说人话:

根据key进行排序。这个函数比较简单

3.8join

SparkCore-常用转换算子总结

 源码中的解释机翻如下:

返回一个RDD,包含所有在' this '和' other '中具有匹配键的元素对。每对元素将以a (k, (v1, v2))元组的形式返回,其中(k, v1)在' this '中,(k, v2)在' other '中。执行跨集群的散列连接。

说人话:

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD。且key的类型必须相同

3.9leftOuterJoin

有左当然也有右,这个比较基础,和sql中的左外连接以及右外连接比较像。

3.10cogroup

SparkCore-常用转换算子总结

源码中的解释机翻如下:

 对于' this '或' other '中的每个键k,返回一个结果RDD,该RDD包含一个元组,其中包含' this '和' other '中该键的值列表。

说人话:

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD 。

 

如下对rdd1和rdd2使用

val rdd1: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("b", 2) //, ("c", 3)
    ))

    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6), ("c", 7)
    ))

    //cogroup : connect + group
    val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

得到结果如下:

SparkCore-常用转换算子总结

上一篇:redis 和 mysql 的主从复制


下一篇:RDD中的依赖关系