python-PySpark groupby和最大值选择

我有一个PySpark数据框

 name   city     date
 satya  Mumbai  13/10/2016
 satya  Pune    02/11/2016
 satya  Mumbai  22/11/2016
 satya  Pune    29/11/2016
 satya  Delhi   30/11/2016
 panda  Delhi   29/11/2016
 brata  BBSR    28/11/2016
 brata  Goa     30/10/2016
 brata  Goa     30/10/2016

我需要为每个名称找出最喜欢的CITY,逻辑是“如果在“名称”“城市”对上具有最大城市出现次数的城市,则将城市作为fav_city.如果发现多个相同的事件,则考虑具有最新日期的城市.会解释:

d = df.groupby('name','city').count()
#name  city  count
brata Goa    2  #clear favourite
brata BBSR   1
panda Delhi  1  #as single so clear favourite
satya Pune   2  ##Confusion
satya Mumbai 2  ##confusion
satya Delhi  1   ##shd be discard as other cities having higher count than this city

#So get cities having max count
dd = d.groupby('name').agg(F.max('count').alias('count'))
ddd = dd.join(d,['name','count'],'left')
#name  count  city
 brata    2   Goa    #fav found
 panda    1   Delhi  #fav found
 satya    2   Mumbai #can't say
 satya    2   Pune   #can't say

对于用户“ satya”,我需要返回trx_history并从上次交易的“孟买”或“浦那”获得具有等于_最大计数I:e的城市的最新日期(最大日期),将该城市视为fav_city.在这种情况下,“ Pune”作为“ 29/11/2016”是最新/最晚日期.

但是我无法继续进行该工作.

请提供逻辑上的帮助,或者提供更好的解决方案(更快/更紧凑的方式),请提出建议.谢谢.

解决方法:

首先将日期转换为DateType:

df_with_date = df.withColumn(
    "date",
    F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)

下一个组按用户和城市划分,但扩展聚合如下:

df_agg = (df_with_date
    .groupBy("name", "city")
    .agg(F.count("city").alias("count"), F.max("date").alias("max_date")))

定义一个窗口:

from pyspark.sql.window import Window

w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))

添加等级:

df_with_rank = (df_agg
    .withColumn("rank", F.dense_rank().over(w)))

和过滤器:

result = df_with_rank.where(F.col("rank") == 1)

您可以使用以下代码检测剩余的重复项:

import sys

final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)
上一篇:如何将Python连接到Spark会话并使RDD保持活动状态


下一篇:python-将PySpark数据框列类型转换为字符串并替换方括号