如在Web上的many other locations中所述,向现有DataFrame添加新列并不简单.不幸的是,拥有此功能非常重要(即使它在分布式环境中效率低下),尤其是在尝试使用unionAll连接两个DataFrame时.
将空列添加到DataFrame以便于unionAll的最优雅的解决方法是什么?
我的版本是这样的:
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))
解决方法:
你需要的只是文字和演员:
from pyspark.sql.functions import lit
new_df = old_df.withColumn('new_column', lit(None).cast(StringType()))
一个完整的例子:
df = sc.parallelize([row(1, "2"), row(2, "3")]).toDF()
df.printSchema()
## root
## |-- foo: long (nullable = true)
## |-- bar: string (nullable = true)
new_df = df.withColumn('new_column', lit(None).cast(StringType()))
new_df.printSchema()
## root
## |-- foo: long (nullable = true)
## |-- bar: string (nullable = true)
## |-- new_column: string (nullable = true)
new_df.show()
## +---+---+----------+
## |foo|bar|new_column|
## +---+---+----------+
## | 1| 2| null|
## | 2| 3| null|
## +---+---+----------+
可以在此处找到Scala等效项:Create new Dataframe with empty/null field values