python-如何使用来自另一个数据框的新值更新pyspark数据框?

我有两个Spark数据框:

数据框A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

和数据框B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

数据框B可以包含来自数据框A的重复行,更新行和新行.我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行.

我从创建仅包含不可更新列的哈希列开始.这是唯一的ID.因此,假设col1和col2可以更改值(可以更新),但是col3,..,coln是唯一的.我创建了一个哈希函数作为hash(col3,.. coln):

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在,我想编写一些火花代码,基本上从B中选择哈希值不在A中的行(因此,新行和更新后的行),并将它们与A中的行一起加入新的数据帧中. pyspark?

编辑:
数据框B可以有来自数据框A的额外列,因此无法进行联合.

样例

数据框A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

数据框B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

结果:
数据框C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

解决方法:

这与update a dataframe column with new values密切相关,除了您还想添加DataFrame B中的行.一种方法是首先执行链接的问题中概述的操作,然后将结果与DataFrame B合并并删除重复项.

例如:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

如果您有很多要替换的列并且不想对所有这些都进行硬编码,则可以更一般地使用列表推导.

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
上一篇:19.2.7 [LeetCode 49] Group Anagrams


下一篇:[CF461B] Appleman and Tree - 树形dp