我正在使用pyspark通过具有约15 m4.large内核的AWS EMR处理50Gb数据.
数据的每一行都包含一天中特定时间的一些信息.我正在使用以下for循环来每小时提取和汇总信息.最后,我合并数据,因为我希望将结果保存在一个csv文件中.
# daily_df is a empty pyspark DataFrame
for hour in range(24):
hourly_df = df.filter(hourFilter("Time")).groupby("Animal").agg(mean("weights"), sum("is_male"))
daily_df = daily_df.union(hourly_df)
据我所知,我必须执行以下操作来强制pyspark.sql.Dataframe对象保存到1个csv文件(大约1Mb)而不是100个文件中:
daily_df.coalesce(1).write.csv("some_local.csv")
似乎花了大约70分钟才能完成此进度,我想知道是否可以通过使用collect()方法来使其更快?
daily_df_pandas = daily_df.collect()
daily_df_pandas.to_csv("some_local.csv")
解决方法:
一般来说,coalcece(1)和collect都相当糟糕,但是预期的输出大小大约为1MB并不重要.它根本不应该成为这里的瓶颈.
一种简单的改进是删除循环->过滤器->合并并执行单个聚合:
df.groupby(hour("Time"), col("Animal")).agg(mean("weights"), sum("is_male"))
如果这还不够,那么这里的问题很可能是配置(开始的好地方可能是调整spark.sql.shuffle.partitions(如果您还没有这样做的话).