在pyspark 中保存rdd的内存到文件的时候,会遇到文件夹已经存在而失败,所以如果文件夹已经存在,需要先删除。
搜索了下资料,发现pyspark并没有提供直接管理hdfs文件系统的功能。寻找到一个删除的方法,是通过调用shell命令 hadoop fs -rm -f 来删除,这个方法感觉不怎么好,所以继续找。
后来通过查找hadoop hdfs 的源代码发现 hdfs是通过java 的包 org.appache.hadoop.fs 中的几个类来完成的,也有用java创建 hdfs对象再调用对象的删除方法可删除。
所以下一步思路就是尝试是不是可以把 java的类在 python 中引用,最后得到的答案是肯定的,代码如下:
def jPath(sc, filepath):
jPathClass = sc._gateway.jvm.org.apache.hadoop.fs.Path
return jPathClass(filepath)
def jFileSystem(sc):
jFileSystemClass = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
hadoop_configuration = sc._jsc.hadoopConfiguration()
return jFileSystemClass.get(hadoop_configuration)
def write(sc, filepath, content, overwite=True):
try:
fs = jFileSystem(sc)
out = fs.create(jPath(sc, filepath), overwrite)
out.write(bytearray(content, "utf-8"))
out.flush()
out.close()
except Exception as e:
print(e)
spark = spark_create()
sc = spark._sc
sc.setLogLevel("ERROR")
sqc = SQLContext(sc)
#
# 删除HDFS目录主程序
#
def __name__ == "__main__":
del_hdfs = "hdfs://host:port/del_filepath"
fs = jFileSystem(sc)
del_path_obj = jPath(sc, del_hdfs)
if fs.exists(del_path_obj):
fs.delete(del_path_obj)
print(" del ok. hdfs=%s"%del_hdfs)
else:
print(" path not exists. hdfs=%s"%del_hdfs)
可顺利在 pyspark 中使用 java 类,核心是 sc._gateway.jvm 来实现 ,代码中用到的
org.apache.hadoop.fs.FileSystem
org.apache.hadoop.fs.Path
是在 hadoop 的 jar包 hadoop-common-2.10.1.jar中,所以SPARK在提交的时候要加上这个jar包
spark-submit --jar= xxxx/hadoop-common-2.10.1.jar
即可,如果不加会报 java类找不到的ERROR