我一直在使用Google搜索,并尝试了几个小时而没有运气,希望大家能提供建议.
我正在将大量文件读取到Spark RDD中,并且想要将包含时间戳的文件名附加到RDD的每一行中.到目前为止,这就是我所得到的.
def append_name(x):
filename = x[0].split('\n') #take the filename
content = x[1].split('\n') #take the content
output = [(row.split(','), filename) for row in content]
flattened = [item for sublist in output for item in sublist]
return flattened
data_file = sc.wholeTextFiles("data/").flatMap(lambda l: append_name(l))
data_file.take(1)
我的输出如下所示:
[[u'25689119206459',
u'True',
u'3',
u'main',
u'used_car',
u'Huzhou',
u'False',
u'False',
u'21824706815494',
u'0',
u'http://jump.zhineng.58.com/clk?target=mv7V0A-b5HTh',
u'0\r'],
[u'file:/home/user/data/light_2016-06-01T04-02-27_node1.csv']]
仍然是列表的清单…即使我已将其展平.有什么我想念的想法吗?我还尝试使用建议的here解决方案,并获得了相同的结果.
谢谢.
解决方法:
尝试这个:
>>> def flatten(pair):
... f, text = pair
... return [line.split(",") + [f] for line in text.splitlines()]
...
>>> rdd = sc.wholeTextFiles("README.md").flatMap(flatten)
>>> for x in rdd.take(3):
... print(x)
...
['# Apache Spark', 'file:/some-path/README.md']
['', 'file:/some-path/README.md']
['Spark is a fast and general cluster computing system for Big Data. It provides', 'file:/some-path/README.md']