RDD 常用的算子
RDD 中的算子从功能上分为两大类
-
Transformation(转换) 它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD
-
Action(动作) 执行各个分区的计算任务, 将的到的结果返回到 Driver 中
Transformation(转换)
-
map
作用
把 RDD 中的数据 一对一 的转为另一种形式
签名
def map[U: ClassTag](f: T ⇒ U): RDD[U]
参数
f → Map 算子是 原RDD → 新RDD 的过程, 传入函数的参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据
注意点
Map 是一对一, 如果函数是 String → Array[String] 则新的 RDD 中每条数据就是一个数组 -
flatMap
作用
FlatMap 算子和 Map 算子类似, 但是 FlatMap 是一对多
调用
def flatMap[U: ClassTag](f: T ⇒ List[U]): RDD[U]
参数
f → 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDD
注意点
flatMap 其实是两个操作, 是 map + flatten, 也就是先转换, 后把转换而来的 List 展开
Spark 中并没有直接展平 RDD 中数组的算子, 可以使用 flatMap 做这件事 -
filter
作用
Filter 算子的主要作用是过滤掉不需要的内容 -
mapPartitions(List[T] ⇒ List[U])
RDD[T] ⇒ RDD[U] 和 map 类似, 但是针对整个分区的数据转换 -
mapPartitionsWithIndex
和 mapPartitions 类似, 只是在函数中增加了分区的 Index -
mapValues
MapValues 只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value -
sample(withReplacement, fraction, seed)
作用
Sample 算子可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失
参数
Sample 接受第一个参数为 withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说, 如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复
Sample 接受第二个参数为 fraction, 意为抽样的比例
Sample 接受第三个参数为 seed, 随机数种子, 用于 Sample 内部随机生成下标, 一般不指定, 使用默认值 -
union(other)、intersection(other)、subtract(other, numPartitions)
合集、交集、差集 -
distinct(numPartitions)
作用
Distinct 算子用于去重
注意点
Distinct 是一个需要 Shuffled 的操作
本质上 Distinct 就是一个 reductByKey, 把重复的合并为一个 -
reduceByKey((V, V) ⇒ V, numPartition)
作用
首先按照 Key 分组生成一个 Tuple(元组), 然后针对每个组执行 reduce 算子
调用
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
参数
func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果
注意点 -
ReduceByKey 只能作用于 Key-Value 型数- 据, Key-Value 型数据在当前语境中特指 Tuple2
-
ReduceByKey 是一个需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少 -
groupByKey()
作用
GroupByKey 算子的主要作用是按照 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value
注意点
GroupByKey 是一个 Shuffled
GroupByKey 和 ReduceByKey 不同, 因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好 -
combineByKey()
作用
对数据集按照 Key 进行聚合
调用
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])
参数
createCombiner 将 Value 进行初步转换
mergeValue 在每个分区把上一步转换的结果聚合
mergeCombiners 在所有分区上把每个分区的聚合结果聚合
partitioner 可选, 分区函数
mapSideCombiner 可选, 是否在 Map 端 Combine
serializer 序列化器
注意点
combineByKey 的要点就是三个函数的意义要理解
groupByKey, reduceByKey 的底层都是 combineByKey -
aggregateByKey()
作用
聚合所有 Key 相同的 Value, 换句话说, 按照 Key 聚合 Value
调用
rdd.aggregateByKey(zeroValue)(seqOp, combOp)
参数
zeroValue 初始值
seqOp 转换每一个值的函数
comboOp 将转换过的值聚合的函数
注意点 * 为什么需要两个函数? aggregateByKey 运行将一个 RDD[(K, V)] 聚合为 RDD[(K, U)], 如果要做到这件事的话, 就需要先对数据做一次转换, 将每条数据从 V 转为 U, seqOp 就是干这件事的 ** 当 seqOp 的事情结束以后, comboOp 把其结果聚合
和 reduceByKey 的区别::
aggregateByKey 最终聚合结果的类型和传入的初始值类型保持一致
reduceByKey 在集合中选取第一个值作为初始值, 并且聚合过的数据类型不能改变 -
foldByKey()
作用
和 ReduceByKey 是一样的, 都是按照 Key 做分组去求聚合, 但是 FoldByKey 的不同点在于可以指定初始值
调用
foldByKey(zeroValue)(func)
参数
zeroValue 初始值
func seqOp 和 combOp 相同, 都是这个参数
注意点
FoldByKey 是 AggregateByKey 的简化版本, seqOp 和 combOp 是同一个函数
FoldByKey 指定的初始值作用于每一个 Value -
join(other, numPartitions)
作用
将两个 RDD 按照相同的 Key 进行连接
调用
join(other, [partitioner or numPartitions])
参数
other 其它 RDD
partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数量来改变分区
注意点
Join 有点类似于 SQL 中的内连接, 只会再结果中包含能够连接到的 Key
Join 的结果是一个笛卡尔积形式, 例如 “a”, 1), (“a”, 2 和 “a”, 10), (“a”, 11 的 Join 结果集是 “a”, 1, 10), (“a”, 1, 11), (“a”, 2, 10), (“a”, 2, 11 -
cogroup(other, numPartitions)
作用
多个 RDD 协同分组, 将多个 RDD 中 Key 相同的 Value 分组
调用
cogroup(rdd1, rdd2, rdd3, [partitioner or numPartitions])
参数
rdd… 最多可以传三个 RDD 进去, 加上调用者, 可以为四个 RDD 协同分组
partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数来改变分区
注意点
对 RDD1, RDD2, RDD3 进行 cogroup, 结果中就一定会有三个 List, 如果没有 Value 则是空 List, 这一点类似于 SQL 的全连接, 返回所有结果, 即使没有关联上
CoGroup 是一个需要 Shuffled 的操作 -
sortBy(ascending, numPartitions)
作用
排序相关相关的算子有两个, 一个是 sortBy, 另外一个是 sortByKey
调用
sortBy(func, ascending, numPartitions)
参数
func 通过这个函数返回要排序的字段
ascending 是否升序
numPartitions 分区数
注意点
普通的 RDD 没有 sortByKey, 只有 Key-Value 的 RDD 才有
sortBy 可以指定按照哪个字段来排序, sortByKey 直接按照 Key 来排序 -
coalesce(numPartitions)
作用
一般涉及到分区操作的算子常见的有两个, repartitioin 和 coalesce, 两个算子都可以调大或者调小分区数量
调用
repartitioin(numPartitions)
coalesce(numPartitions, shuffle)
参数
numPartitions 新的分区数
shuffle 是否 shuffle, 如果新的分区数量比原分区数大, 必须 Shuffled, 否则重分区无效
注意点
repartition 和 coalesce 的不同就在于 coalesce 可以控制是否 Shuffle
repartition 是一个 Shuffled 操作
Action 算子
reduce( (T, T) ⇒ U )
作用
对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总
调用
reduce( (currValue[T], agg[T]) ⇒ T )
注意点
reduce 和 reduceByKey 是完全不同的, reduce 是一个 action, 并不是 Shuffled 操作
本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总
collect()
以数组的形式返回数据集中所有元素
count()
返回元素个数
first()
返回第一个元素
take( N )
返回前 N 个元素
takeSample(withReplacement, fract)
类似于 sample, 区别在这是一个Action, 直接返回结果
fold(zeroValue)( (T, T) ⇒ U )
指定初始值和计算函数, 折叠聚合整个数据集
saveAsTextFile(path)
将结果存入 path 对应的文件中
saveAsSequenceFile(path)
将结果存入 path 对应的 Sequence 文件中
countByKey()
求得整个数据集中 Key 以及对应 Key 出现的次数
foreach( T ⇒ … )
遍历每一个元素
总结
RDD 的算子大部分都会生成一些专用的 RDD
-
map, flatMap, filter 等算子会生成 --MapPartitionsRDD
-
coalesce, repartition 等算子会生成 CoalescedRDD
常见的 RDD 有两种类型
转换型的 RDD, Transformation
- 动作型的 RDD, Action
常见的 Transformation 类型的 RDD
map
-
flatMap
-
filter
-
groupBy
-
reduceByKey
常见的 Action 类型的 RDD
collect
-
countByKey
-
reduce