我正在使用Apache Spark 1.6.2
我有一个.csv数据,它包含约800万行,我想将其转换为DataFrame
但是我必须先将其转换为RDD才能进行映射以获得所需的数据(列)
映射RDD可以正常工作,但是在将RDD转换为DataFrame时,Spark会引发错误
Traceback (most recent call last):
File "C:/Users/Dzaky/Project/TJ-source/source/201512/final1.py", line 38, in <module>
result_iso = input_iso.map(extract_iso).toDF()
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 64, in toDF
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 423, in createDataFrame
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 310, in _createFromRDD
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 254, in _inferSchema
File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1315, in first
File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1297, in take
File "c:\spark\python\lib\pyspark.zip\pyspark\context.py", line 939, in runJob
File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 45, in deco
File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error
这些是我的代码:
def extract_iso(line):
fields = line.split(',')
return [fields[-2], fields[1]]
input_iso = sc.textFile("data.csv")
result_iso = input_iso.map(extract_iso).toDF()
data.csv有超过800万行,但是当我减去这些行直到它只有< 500行,程序运行正常 我不知道Spark是否有行限制或其他限制,有什么方法可以转换RDD吗? 还是有其他方法可以映射DataFrame,就像映射RDD一样?
Additional Information :
the data is messy, total columns in each row is oftenly different from
one to another, that’s why i need to map it first.
But, the data that I want is always at the exact same index [1] and [-2] (the second column, and the second last column), the total column between those columns are different from row to row
非常感谢您的回答:)
解决方法:
最可能的原因是Spark试图识别新创建的数据框的架构.尝试将RDD映射到DF的第二种方法-指定架构,并通过createDataFrame,例如:
>>> from pyspark.sql.types import *
>>> schema = StructType([StructField('a', StringType()),StructField('b', StringType())])
>>> df = sqlContext.createDataFrame(input_iso.map(extract_iso), schema)