python-如何在Spark中对以DenseVector为键的RDD进行groupByKey?

我创建了一个RDD,每个成员是一个键值对,键是DenseVector,值是int.例如

[(DenseVector([3,4]),10),  (DenseVector([3,4]),20)]

现在,我想按密钥k1分组:DenseVector([3,4]).我希望该行为将键k1的所有值分组为10和20.但是我得到的结果是

[(DenseVector([3,4]), 10), (DenseVector([3,4]), 20)] 

代替

[(DenseVector([3,4]), [10,20])]

如果我缺少什么,请告诉我.

相同的代码是:

#simplified version of code
#rdd1 is an rdd containing [(DenseVector([3,4]),10),  (DenseVector([3,4]),20)]
rdd1.groupByKey().map(lambda x : (x[0], list(x[1])))
print(rdd1.collect())

解决方法:

好吧,那是一个棘手的问题,简短的答案就是你不能.要了解为什么您必须更深入地研究DenseVector实现. DenseVector只是NumPy float64 ndarray的包装

>>> dv1 = DenseVector([3.0, 4.0])
>>> type(dv1.array)
<type 'numpy.ndarray'>
>>> dv1.array.dtype
dtype('float64')

由于NumPy ndarrays与DenseVector不同,因此可变数组不能以有意义的方式进行哈希处理,尽管有趣的是提供了__hash__方法.有一个有趣的问题涵盖了此问题(请参阅:numpy ndarray hashability).

>>> dv1.array.__hash__() is None
False
>>> hash(dv1.array)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'numpy.ndarray'

DenseVector从对象继承__hash__方法,它仅基于id(给定实例的内存地址):

>>> id(d1) / 16 == hash(d1)
True

不幸的是,这意味着具有相同内容的两个DenseVectors具有不同的哈希值:

>>> dv2 = DenseVector([3.0, 4.0])
>>> hash(dv1) == hash(dv2)
False

你能做什么?最简单的方法是使用不可变的数据结构,该结构提供一致的哈希实现,例如元组:

rdd.groupBy(lambda (k, v): tuple(k))

注意:实际上,将数组用作键很可能是个坏主意.拥有大量元素时,使用散列处理程序可能会非常昂贵.不过,如果您确实需要像这样的东西,Scala似乎可以正常工作:

import org.apache.spark.mllib.linalg.Vectors

val rdd = sc.parallelize(
    (Vectors.dense(3, 4), 10) :: (Vectors.dense(3, 4), 20) :: Nil)
rdd.groupByKey.collect
上一篇:python-如何使用PySpark HashPartitioner检测大型json文件中的重复项


下一篇:python-尝试运行Word2Vec示例时PySpark中出现错误