pyspark 中删除hdfs的文件夹

在pyspark 中保存rdd的内存到文件的时候,会遇到文件夹已经存在而失败,所以如果文件夹已经存在,需要先删除。

搜索了下资料,发现pyspark并没有提供直接管理hdfs文件系统的功能。寻找到一个删除的方法,是通过调用shell命令 hadoop fs -rm -f 来删除,这个方法感觉不怎么好,所以继续找。

后来通过查找hadoop hdfs 的源代码发现 hdfs是通过java 的包 org.appache.hadoop.fs 中的几个类来完成的,也有用java创建 hdfs对象再调用对象的删除方法可删除。

所以下一步思路就是尝试是不是可以把 java的类在 python 中引用,最后得到的答案是肯定的,代码如下:

def jPath(sc, filepath):

    jPathClass = sc._gateway.jvm.org.apache.hadoop.fs.Path

    return jPathClass(filepath)

 

def jFileSystem(sc):

    jFileSystemClass = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem

    hadoop_configuration = sc._jsc.hadoopConfiguration()

    return jFileSystemClass.get(hadoop_configuration)

 

def write(sc, filepath, content, overwite=True):

    try:

        fs = jFileSystem(sc)

        out = fs.create(jPath(sc, filepath), overwrite)

        out.write(bytearray(content, "utf-8"))

        out.flush()

        out.close()

    except Exception as e:

        print(e)

    spark = spark_create()

    sc = spark._sc

    sc.setLogLevel("ERROR")

    sqc = SQLContext(sc)

#

#  删除HDFS目录主程序

#

def __name__ == "__main__":

    del_hdfs = "hdfs://host:port/del_filepath"

    fs = jFileSystem(sc)

    del_path_obj = jPath(sc, del_hdfs)

    if fs.exists(del_path_obj):

        fs.delete(del_path_obj)

        print(" del ok. hdfs=%s"%del_hdfs)

    else:

        print(" path not exists. hdfs=%s"%del_hdfs)

 可顺利在 pyspark 中使用 java 类,核心是 sc._gateway.jvm 来实现 ,代码中用到的

org.apache.hadoop.fs.FileSystem

org.apache.hadoop.fs.Path

是在 hadoop 的 jar包 hadoop-common-2.10.1.jar中,所以SPARK在提交的时候要加上这个jar包

  spark-submit --jar= xxxx/hadoop-common-2.10.1.jar 

即可,如果不加会报 java类找不到的ERROR

  

 

 

 

 

上一篇:解决Ctrl+Alt+Del无限重启的一种方法。


下一篇:python列表(包含列表方法)