groupBy(f, numPartitions=None, partitionFunc=<function portable_hash>):根据 条件 分组,这个条件是一个函数;输出 (key,迭代器)
## 条件是分组依据,条件不影响最后的输出格式,输出格式仍和原数据相同 ## 如 原来是 [1, 2],经过分组后分到了 第 1 组,输出是 [1, [1, 2]], [1, 2] 完全保留 # 这个例子相当于求 奇偶数 rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) result = rdd.groupBy(lambda x: x % 2).collect() # [(0, <pyspark.resultiterable.ResultIterable object at 0x7f2f76096890>), # (1, <pyspark.resultiterable.ResultIterable object at 0x7f2f760965d0>)] # 解析迭代器并排序 sorted([(x, sorted(y)) for (x, y) in result]) # [(0, [2, 8]), (1, [1, 1, 3, 5])]
groupByKey(numPartitions=None, partitionFunc=<function portable_hash>):按 key 进行分组;输出 (key,迭代器)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 3)]) rdd.groupByKey().collect() # [('a', <pyspark.resultiterable.ResultIterable object at 0x7f2f760a5a90>), # ('b', <pyspark.resultiterable.ResultIterable object at 0x7f2f760a5b10>)] rdd.groupByKey().mapValues(len).collect() # [('a', 2), ('b', 1)] sorted(rdd.groupByKey().mapValues(list).collect()) # [('a', [1, 3]), ('b', [1])] 把迭代器转成 list
groupWith(other, *others):把多个 RDD 的 key 进行分组;输出 (key,迭代器)
分组后的数据是有顺序的,每个 key 对应的 value 是按 原本 RDD 的顺序的,如果原本 RDD 没有这个 key,留空
w = sc.parallelize([("a", 5), ("b", 6)]) x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2)]) z = sc.parallelize([("b", 42)]) w.groupWith(x, y, z).collect() [(x, tuple(map(list, y))) for x, y in list(w.groupWith(x, y, z).collect())] # [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
未完待续...