如何在Spark SQL中的多个列上进行数据透视?

我需要在pyspark数据帧中转动多个列.示例数据框,

 >>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]
>>> mydf = spark.createDataFrame(d,['id','day','price','units'])
>>> mydf.show()
+---+---+-----+-----+
| id|day|price|units|
+---+---+-----+-----+
|100|  1|   23|   10|
|100|  2|   45|   11|
|100|  3|   67|   12|
|100|  4|   78|   13|
|101|  1|   23|   10|
|101|  2|   45|   13|
|101|  3|   67|   14|
|101|  4|   78|   15|
|102|  1|   23|   10|
|102|  2|   45|   11|
|102|  3|   67|   16|
|102|  4|   78|   18|
+---+---+-----+-----+

现在,如果我需要根据日期将每个id的价格列放到一行,那么我可以使用pivot方法,

>>> pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.first('price'))
>>> pvtdf.show()
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|100|     23|     45|     67|     78|
|101|     23|     45|     67|     78|
|102|     23|     45|     67|     78|
+---+-------+-------+-------+-------+

因此,当我需要将单位列作为价格进行转置时,要么我需要为单位创建一个以上的数据帧,然后使用id.But加入两者,当我有更多列时,我尝试了一个函数来执行它,

>>> def pivot_udf(df,*cols):
...     mydf = df.select('id').drop_duplicates()
...     for c in cols:
...        mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('{}_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id')
...     return mydf
...
>>> pivot_udf(mydf,'price','units').show()
+---+-------+-------+-------+-------+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4|
+---+-------+-------+-------+-------+-------+-------+-------+-------+
|100|     23|     45|     67|     78|     10|     11|     12|     13|
|101|     23|     45|     67|     78|     10|     13|     14|     15|
|102|     23|     45|     67|     78|     10|     11|     16|     18|
+---+-------+-------+-------+-------+-------+-------+-------+-------+

需要建议,如果这是一个好的做法,如果有任何其他更好的方法.提前致谢!

解决方法:

问题的解决方案是我能得到的最好的解决方案.唯一的改进是缓存输入数据集以避免双重扫描,即

mydf.cache
pivot_udf(mydf,'price','units').show()
上一篇:如何使用PySpark加载IPython shell


下一篇:python – 可以在Spark中按组扩展数据吗?