spark教程(五)-action 操作 group 系列

groupBy(f, numPartitions=None, partitionFunc=<function portable_hash>):根据 条件 分组,这个条件是一个函数;输出 (key,迭代器)

## 条件是分组依据,条件不影响最后的输出格式,输出格式仍和原数据相同
## 如 原来是 [1, 2],经过分组后分到了 第 1 组,输出是 [1, [1, 2]], [1, 2] 完全保留

# 这个例子相当于求 奇偶数
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()     # [(0, <pyspark.resultiterable.ResultIterable object at 0x7f2f76096890>),
                                                    # (1, <pyspark.resultiterable.ResultIterable object at 0x7f2f760965d0>)]
# 解析迭代器并排序
sorted([(x, sorted(y)) for (x, y) in result])       # [(0, [2, 8]), (1, [1, 1, 3, 5])]

 

groupByKey(numPartitions=None, partitionFunc=<function portable_hash>):按 key 进行分组;输出 (key,迭代器)

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 3)])
rdd.groupByKey().collect()      # [('a', <pyspark.resultiterable.ResultIterable object at 0x7f2f760a5a90>),
                                # ('b', <pyspark.resultiterable.ResultIterable object at 0x7f2f760a5b10>)]

rdd.groupByKey().mapValues(len).collect()       # [('a', 2), ('b', 1)]

sorted(rdd.groupByKey().mapValues(list).collect())  # [('a', [1, 3]), ('b', [1])] 把迭代器转成 list

 

groupWith(other, *others):把多个 RDD 的 key 进行分组;输出 (key,迭代器)

分组后的数据是有顺序的,每个 key 对应的 value 是按 原本 RDD 的顺序的,如果原本 RDD 没有这个 key,留空

w = sc.parallelize([("a", 5), ("b", 6)])
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = sc.parallelize([("b", 42)])
w.groupWith(x, y, z).collect()

[(x, tuple(map(list, y))) for x, y in list(w.groupWith(x, y, z).collect())]     # [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

 

 

未完待续...

 

上一篇:spark教程(四)-python基础编程


下一篇:Scala当中parallelize并行化的用法