python中的spark自定义排序

我在Spark中有一个RDD(下面的python代码):

list1 = [(1,1),(10,100)]
df1 = sc.parallelize(list1)
df1.take(2)
## [(1, 1), (10, 100)]

我想做一个自定义排序,根据元组中的两个条目比较这些元组.在python中,此比较的逻辑类似于:

# THRESH is some constant
def compare_tuple(a, b):
    center = a[0] - b[0]
    dev = a[1] + b[1]
    r = center / dev
    if r < THRESH:
        return -1
    else if r == THRESH:
        return 0
    else:
        return 1

我会在python中做一个自定义排序:

list1.sort(compare_tuple)

如何在pyspark中做到这一点?根据rdd文档:

https://spark.apache.org/docs/1.4.1/api/python/pyspark.html#pyspark.RDD

sortBy方法没有自定义排序参数.

我看到scala接口sortBy支持此功能:

https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.rdd.RDD

但是我想要在python spark中使用.也欢迎任何解决方法类型的解决方案,谢谢!

解决方法:

您总是可以创建一个自定义类并实现丰富的丰富比较方法:

>对

class Pair(tuple):
    def _cmp(self, other):
        center = self[0] - other[0]
        dev = self[1] + other[1]
        r = center / dev if dev != 0 else center
        if r < 0:
            return -1
        if r >  0:
            return 1
        return 0

    def __lt__(self, other):
        return self._cmp(other) < 0

    def __lte__(self, other):
        return self._cmp(other) <= 0

    def __eq__(self, other):
        return self._cmp(other) == 0

    def __ge__(self, other):
        return self._cmp(other) >= 0

    def __gt__(self, other):
        return self._cmp(other) > 0

>主脚本

from pair import Pair

sc.addPyFile("pair.py")

rdd = sc.parallelize([(1, 1),(10, 100), (-1, 1), (-1, -0.5)]).map(Pair)
rdd.sortBy(lambda x: x).collect()
## [(-1, 1), (-1, -0.5), (1, 1), (10, 100)]

但是,如果dev是标准偏差,那么它不会影响结果,您可以使用提取元数据中心(lambda x x:[0])的简单元组或keyfunc安全地按标识排序.

上一篇:python-尝试运行Word2Vec示例时PySpark中出现错误


下一篇:python-集群上的pyspark,确保使用了所有节点