python-将文件名添加到WholeTextFiles上的RDD行

我一直在使用Google搜索,并尝试了几个小时而没有运气,希望大家能提供建议.

我正在将大量文件读取到Spark RDD中,并且想要将包含时间戳的文件名附加到RDD的每一行中.到目前为止,这就是我所得到的.

def append_name(x):
    filename = x[0].split('\n') #take the filename
    content = x[1].split('\n') #take the content
    output = [(row.split(','), filename) for row in content]
    flattened = [item for sublist in output for item in sublist]
    return flattened

data_file = sc.wholeTextFiles("data/").flatMap(lambda l: append_name(l))

data_file.take(1)

我的输出如下所示:

[[u'25689119206459',
  u'True',
  u'3',
  u'main',
  u'used_car',
  u'Huzhou',
  u'False',
  u'False',
  u'21824706815494',
  u'0',
  u'http://jump.zhineng.58.com/clk?target=mv7V0A-b5HTh',
  u'0\r'],
 [u'file:/home/user/data/light_2016-06-01T04-02-27_node1.csv']]

仍然是列表的清单…即使我已将其展平.有什么我想念的想法吗?我还尝试使用建议的here解决方案,并获得了相同的结果.

谢谢.

解决方法:

尝试这个:

>>> def flatten(pair):
...     f, text = pair
...     return [line.split(",") + [f] for line in text.splitlines()]
... 
>>> rdd = sc.wholeTextFiles("README.md").flatMap(flatten)
>>> for x in rdd.take(3):
...     print(x)
...     
['# Apache Spark', 'file:/some-path/README.md']
['', 'file:/some-path/README.md']
['Spark is a fast and general cluster computing system for Big Data. It provides', 'file:/some-path/README.md']
上一篇:python-read_csv()中的S3阅读器是先将文件下载到磁盘还是使用流式传输?


下一篇:我们可以在Spark DataFrame列中使用Pandas函数吗?如果是这样,怎么办?