如何将文件传递到主节点?

我已经在python中编写代码来实现二进制分类,我想使用Apache-Spark基于本地计算机中的不同数据文件并行化这个分类过程.我已经完成了以下步骤:

>我编写了包含4个python文件的整个项目:“run_classifer.py”(用于运行我的分类应用程序),“classifer.py”(用于二进制分类),“load_params.py”(用于加载学习)分类参数)和“preprocessing.py”(用于预处理数据).该项目还使用依赖项文件:“tokenizer.perl”(用于预处理部分)和“nonbreaking_prefixes / nonbreaking_prefix.en”(也用于预处理部分).
>我的脚本文件“run_classifer.py”的主要部分定义如下,

### Initialize the Spark
conf = SparkConf().setAppName("ruofan").setMaster("local")
sc = SparkContext(conf = conf,
    pyFiles=['''All python files in my project as
             well as "nonbreaking_prefix.en" and "tokenizer.perl"'''])

### Read data directory from S3 storage, and create RDD
datafile = sc.wholeTextFiles("s3n://bucket/data_dir") 

### Sent the application on each of the slave node
datafile.foreach(lambda (path, content): classifier(path, content)) 

但是,当我运行我的脚本“run_classifier.py”时,似乎无法找到文件“nonbreaking_prefix.en”.以下是我得到的错误:

ERROR: No abbreviations files found in /tmp/spark-f035270e-e267-4d71-9bf1-8c42ca2097ee/userFiles-88093e1a-6096-4592-8a71-be5548a4f8ae/nonbreaking_prefixes

但我实际上将文件“nonbreaking_prefix.en”传递给主节点,我对错误没有任何想法.如果有人帮助我解决问题,我将非常感激.

解决方法:

您可以使用sc.addFile上传文件,并使用SparkFiles.get获取worker的路径:

from pyspark import SparkFiles

sc = (SparkContext(conf = conf,
    pyFiles=["All",  "Python", "Files",  "in",  "your", "project"])

# Assuming both files are in your working directory
sc.addFile("nonbreaking_prefix.en")
sc.addFile("tokenizer.perl")

def classifier(path, content):
   # Get path for uploaded files
   print SparkFiles.get("tokenizer.perl")

   with open(SparkFiles.get("nonbreaking_prefix.en")) as fr:
       lines = [line for line in fr]
上一篇:python – 从数据帧中获取值


下一篇:python – PySpark.将Dataframe传递给pandas_udf并返回一个系列