RDD 的算子总结

RDD 常用的算子

RDD 中的算子从功能上分为两大类

  • Transformation(转换) 它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD

  • Action(动作) 执行各个分区的计算任务, 将的到的结果返回到 Driver 中

Transformation(转换)

  1. map
    作用
    把 RDD 中的数据 一对一 的转为另一种形式
    签名
    def map[U: ClassTag](f: T ⇒ U): RDD[U]
    参数
    f → Map 算子是 原RDD → 新RDD 的过程, 传入函数的参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据
    注意点
    Map 是一对一, 如果函数是 String → Array[String] 则新的 RDD 中每条数据就是一个数组

  2. flatMap
    作用
    FlatMap 算子和 Map 算子类似, 但是 FlatMap 是一对多
    调用
    def flatMap[U: ClassTag](f: T ⇒ List[U]): RDD[U]
    参数
    f → 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDD
    注意点
    flatMap 其实是两个操作, 是 map + flatten, 也就是先转换, 后把转换而来的 List 展开
    Spark 中并没有直接展平 RDD 中数组的算子, 可以使用 flatMap 做这件事

  3. filter
    作用
    Filter 算子的主要作用是过滤掉不需要的内容

  4. mapPartitions(List[T] ⇒ List[U])
    RDD[T] ⇒ RDD[U] 和 map 类似, 但是针对整个分区的数据转换

  5. mapPartitionsWithIndex
    和 mapPartitions 类似, 只是在函数中增加了分区的 Index

  6. mapValues
    MapValues 只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value

  7. sample(withReplacement, fraction, seed)
    作用
    Sample 算子可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失
    参数
    Sample 接受第一个参数为 withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说, 如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复
    Sample 接受第二个参数为 fraction, 意为抽样的比例
    Sample 接受第三个参数为 seed, 随机数种子, 用于 Sample 内部随机生成下标, 一般不指定, 使用默认值

  8. union(other)、intersection(other)、subtract(other, numPartitions)
    合集、交集、差集

  9. distinct(numPartitions)
    作用
    Distinct 算子用于去重
    注意点
    Distinct 是一个需要 Shuffled 的操作
    本质上 Distinct 就是一个 reductByKey, 把重复的合并为一个

  10. reduceByKey((V, V) ⇒ V, numPartition)
    作用
    首先按照 Key 分组生成一个 Tuple(元组), 然后针对每个组执行 reduce 算子
    调用
    def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
    参数
    func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果
    注意点

  11. ReduceByKey 只能作用于 Key-Value 型数- 据, Key-Value 型数据在当前语境中特指 Tuple2

  12. ReduceByKey 是一个需要 Shuffled 的操作
    和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少

  13. groupByKey()
    作用
    GroupByKey 算子的主要作用是按照 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value
    注意点
    GroupByKey 是一个 Shuffled
    GroupByKey 和 ReduceByKey 不同, 因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好

  14. combineByKey()
    作用
    对数据集按照 Key 进行聚合
    调用
    combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])
    参数
    createCombiner 将 Value 进行初步转换
    mergeValue 在每个分区把上一步转换的结果聚合
    mergeCombiners 在所有分区上把每个分区的聚合结果聚合
    partitioner 可选, 分区函数
    mapSideCombiner 可选, 是否在 Map 端 Combine
    serializer 序列化器
    注意点
    combineByKey 的要点就是三个函数的意义要理解
    groupByKey, reduceByKey 的底层都是 combineByKey

  15. aggregateByKey()
    作用
    聚合所有 Key 相同的 Value, 换句话说, 按照 Key 聚合 Value
    调用
    rdd.aggregateByKey(zeroValue)(seqOp, combOp)
    参数
    zeroValue 初始值
    seqOp 转换每一个值的函数
    comboOp 将转换过的值聚合的函数
    注意点 * 为什么需要两个函数? aggregateByKey 运行将一个 RDD[(K, V)] 聚合为 RDD[(K, U)], 如果要做到这件事的话, 就需要先对数据做一次转换, 将每条数据从 V 转为 U, seqOp 就是干这件事的 ** 当 seqOp 的事情结束以后, comboOp 把其结果聚合
    和 reduceByKey 的区别::
    aggregateByKey 最终聚合结果的类型和传入的初始值类型保持一致
    reduceByKey 在集合中选取第一个值作为初始值, 并且聚合过的数据类型不能改变

  16. foldByKey()
    作用
    和 ReduceByKey 是一样的, 都是按照 Key 做分组去求聚合, 但是 FoldByKey 的不同点在于可以指定初始值
    调用
    foldByKey(zeroValue)(func)
    参数
    zeroValue 初始值
    func seqOp 和 combOp 相同, 都是这个参数
    注意点
    FoldByKey 是 AggregateByKey 的简化版本, seqOp 和 combOp 是同一个函数
    FoldByKey 指定的初始值作用于每一个 Value

  17. join(other, numPartitions)
    作用
    将两个 RDD 按照相同的 Key 进行连接
    调用
    join(other, [partitioner or numPartitions])
    参数
    other 其它 RDD
    partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数量来改变分区
    注意点
    Join 有点类似于 SQL 中的内连接, 只会再结果中包含能够连接到的 Key
    Join 的结果是一个笛卡尔积形式, 例如 “a”, 1), (“a”, 2 和 “a”, 10), (“a”, 11 的 Join 结果集是 “a”, 1, 10), (“a”, 1, 11), (“a”, 2, 10), (“a”, 2, 11

  18. cogroup(other, numPartitions)
    作用
    多个 RDD 协同分组, 将多个 RDD 中 Key 相同的 Value 分组
    调用
    cogroup(rdd1, rdd2, rdd3, [partitioner or numPartitions])
    参数
    rdd…​ 最多可以传三个 RDD 进去, 加上调用者, 可以为四个 RDD 协同分组
    partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数来改变分区
    注意点
    对 RDD1, RDD2, RDD3 进行 cogroup, 结果中就一定会有三个 List, 如果没有 Value 则是空 List, 这一点类似于 SQL 的全连接, 返回所有结果, 即使没有关联上
    CoGroup 是一个需要 Shuffled 的操作

  19. sortBy(ascending, numPartitions)
    作用
    排序相关相关的算子有两个, 一个是 sortBy, 另外一个是 sortByKey
    调用
    sortBy(func, ascending, numPartitions)
    参数
    func 通过这个函数返回要排序的字段
    ascending 是否升序
    numPartitions 分区数
    注意点
    普通的 RDD 没有 sortByKey, 只有 Key-Value 的 RDD 才有
    sortBy 可以指定按照哪个字段来排序, sortByKey 直接按照 Key 来排序

  20. coalesce(numPartitions)
    作用
    一般涉及到分区操作的算子常见的有两个, repartitioin 和 coalesce, 两个算子都可以调大或者调小分区数量
    调用
    repartitioin(numPartitions)
    coalesce(numPartitions, shuffle)
    参数
    numPartitions 新的分区数
    shuffle 是否 shuffle, 如果新的分区数量比原分区数大, 必须 Shuffled, 否则重分区无效
    注意点
    repartition 和 coalesce 的不同就在于 coalesce 可以控制是否 Shuffle
    repartition 是一个 Shuffled 操作

Action 算子

reduce( (T, T) ⇒ U )
作用
对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总
调用
reduce( (currValue[T], agg[T]) ⇒ T )
注意点
reduce 和 reduceByKey 是完全不同的, reduce 是一个 action, 并不是 Shuffled 操作
本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总
collect()
以数组的形式返回数据集中所有元素
count()
返回元素个数
first()
返回第一个元素
take( N )
返回前 N 个元素
takeSample(withReplacement, fract)
类似于 sample, 区别在这是一个Action, 直接返回结果
fold(zeroValue)( (T, T) ⇒ U )
指定初始值和计算函数, 折叠聚合整个数据集
saveAsTextFile(path)
将结果存入 path 对应的文件中
saveAsSequenceFile(path)
将结果存入 path 对应的 Sequence 文件中
countByKey()
求得整个数据集中 Key 以及对应 Key 出现的次数
foreach( T ⇒ …​ )
遍历每一个元素

总结

RDD 的算子大部分都会生成一些专用的 RDD

  • map, flatMap, filter 等算子会生成 --MapPartitionsRDD

  • coalesce, repartition 等算子会生成 CoalescedRDD

常见的 RDD 有两种类型
转换型的 RDD, Transformation

  • 动作型的 RDD, Action

常见的 Transformation 类型的 RDD
map

  • flatMap

  • filter

  • groupBy

  • reduceByKey

常见的 Action 类型的 RDD
collect

  • countByKey

  • reduce

上一篇:Spark RDD 分区


下一篇:利用JQuery jsonp实现Ajax跨域请求 .Net 的*.handler 和 WebService,返回json数据