流式传输/分块csv从S3到Python

我打算使用Python对存储在S3中的非常大的csv文件执行一些内存密集型操作,目的是将脚本移动到AWS Lambda.我知道我可以在整个csv nto内存中读取,但我肯定会遇到Lambda的内存和存储限制,如此大的文件有没有任何方法可以使用boto3一次流入或只读取csv的块/ botocore,理想情况下通过指定行号来读入?

以下是我已经尝试过的一些事情:

1)使用S3.get_object中的range参数来指定要读入的字节范围.不幸的是,这意味着最后的行在中间被截断,因为没有办法指定要读入的行数.有些杂乱解决方法,如扫描最后一个换行符,记录索引,然后使用它作为下一个字节范围的起点,但我想尽可能避免这个笨重的解决方案.

2)使用S3 select编写sql查询以有选择地从S3存储桶中检索数据.不幸的是,不支持row_numbers SQL函数,它看起来不像是读取行子集的方法.

解决方法:

假设您的文件未压缩,则应包括从流中读取并拆分换行符.读取一大块数据,找到该块中的换行符的最后一个实例,拆分并处理.

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]

如果你有gzip文件,那么你需要在循环中使用BytesIO和GzipFile类;这是一个更难的问题,因为你需要保留Gzip压缩细节.

上一篇:python – 无法安装boto3


下一篇:基于SAE的免运维、高资源利用率微服务