python-Spark合并与收集,哪个更快?

我正在使用pyspark通过具有约15 m4.large内核的AWS EMR处理50Gb数据.

数据的每一行都包含一天中特定时间的一些信息.我正在使用以下for循环来每小时提取和汇总信息.最后,我合并数据,因为我希望将结果保存在一个csv文件中.

# daily_df is a empty pyspark DataFrame
for hour in range(24):
    hourly_df = df.filter(hourFilter("Time")).groupby("Animal").agg(mean("weights"), sum("is_male"))
    daily_df = daily_df.union(hourly_df)

据我所知,我必须执行以下操作来强制pyspark.sql.Dataframe对象保存到1个csv文件(大约1Mb)而不是100个文件中:

daily_df.coalesce(1).write.csv("some_local.csv")

似乎花了大约70分钟才能完成此进度,我想知道是否可以通过使用collect()方法来使其更快?

daily_df_pandas = daily_df.collect()
daily_df_pandas.to_csv("some_local.csv")

解决方法:

一般来说,coalcece(1)和collect都相当糟糕,但是预期的输出大小大约为1MB并不重要.它根本不应该成为这里的瓶颈.

一种简单的改进是删除循环->过滤器->合并并执行单个聚合:

df.groupby(hour("Time"), col("Animal")).agg(mean("weights"), sum("is_male"))

如果这还不够,那么这里的问题很可能是配置(开始的好地方可能是调整spark.sql.shuffle.partitions(如果您还没有这样做的话).

上一篇:python-将字符串列转换为矢量列Spark DataFrames


下一篇:python-从PySpark中的几列从groupby获取具有最大值的行