场景
从指定 URL 下载文件,并将其转存到指定的 OSS Bucket 中。
问题难点
- 文件非常大,可能有几个 G 甚至十几 G。
- 转存速度要快,转存时间要控制在秒级。
- 需要走公网。
以往的实现方法
1. 利用服务器转存
在 ECS 实例上多进程流式下载、流式上传。
缺点:
- 需要人工操作、人工监控,消耗人力。
- 无意义地重复工作,无法处理同时的大量请求,也无法集成事件源。
- 价格昂贵。
- 带宽有限制,扩大带宽需要支付更多的钱。
- 难以多实例并发。
2. 利用 OSS API 转存
利用 OSS 提供 的 CopyObject API 进行转存。
缺点:
- 只能处理源文件和目标文件都在同一个
region
的情况。 - 不能处理源文件为 URL 的情况。
现在的实现方法
在函数计算(FC)上编写函数。
优点:
- 可以集成事件源,比如通过 OSS 在文件上传后自动触发函数进行转存。
- 无需运维、无需搭建环境,只需提供函数代码。
- 价格低廉,几乎免费。
限制与挑战:
- 跨区域转存需要走公网,带宽有限。
- 函数计算单次执行时间最多为 $600$ 秒。如何把转存任务拆分为细粒度的子任务,使得它既能在 FC 限定的时间内完成,又能大规模并行,显著加速转存任务。
1. 流式处理
要转存的文件太大,无法全部存储在内存里,因此需要采用流式读取/上传的方式。
2. 文件分片
将超大文件平均分成若干片,交由多个函数实例并行执行,从而显著加快传输速度。此外,出现网络问题时,分片的方式可以让函数只对失败的分片进行重试,减少冗余的工作。
3. 多实例并发
这是最为关键的一点。函数在接到转存任务之后,将超大文件进行分片,对于每个分片都启动一个子进程。该子进程通过函数计算的 SDK 同步调用该函数本身,将上传完成后用于最后合并分片的信息通过返回值传递给主函数。主函数在所有子进程运行完毕之后,将这些分片合并,完成整个转存任务。
4. 优化单实例效率
通过参数调优,选择最佳参数等手段,使得单实例转存效率最高。
效果
文件大小 | $200$ MB | $2$ GB | $10.774$ GB | $14.68$ GB |
---|---|---|---|---|
单实例 | $15$ 秒 | $155$ 秒 | $>600$ 秒 | $>600$ 秒 |
$100$ 实例并发 | $1$ 秒 | $4$ 秒 | $11$ 秒 | $14$ 秒 |
示例
1. 创建服务
在函数计算控制台创建服务,并将权限配置为 AliyunOSSFullAccess
、AliyunFCInvocationAccess
。
2. 创建函数
在第一步创建的服务中创建函数,运行环境设置为 python3
,函数执行内存设置为 1536MB
,超时时间设置为 600
秒,代码如下:
# coding=utf-8
import time, json, requests, traceback, oss2, fc2
from requests.exceptions import *
from fc2.fc_exceptions import *
from oss2.models import PartInfo
from oss2.exceptions import *
from multiprocessing import Pool
from contextlib import closing
MAX_SUBTASKS = 99 # The number of worker processes to do subtasks
BLOCK_SIZE = 6 * 1024 * 1024 # The size of each part
CHUNK_SIZE = 8 * 1024 # The size of each chunk
SLEEP_TIME = 0.1 # The initial seconds to wait for retrying
MAX_RETRY_TIME = 10 # The maximum retry times
def retry(func):
"""
Return the result of the lambda function func with retry.
:param func: (required, lambda) the function.
:return: The result of func.
"""
wait_time = SLEEP_TIME
retry_cnt = 1
while True:
if retry_cnt > MAX_RETRY_TIME:
return func()
try:
return func()
except (ConnectionError, SSLError, ConnectTimeout, Timeout) as e:
print(traceback.format_exc())
except (OssError) as e:
if 500 <= e.status < 600:
print(traceback.format_exc())
else:
raise Exception(e)
except (FcError) as e:
if (500 <= e.status_code < 600) or (e.status_code == 429):
print(traceback.format_exc())
else:
raise Exception(e)
print('Retry %d times...' % retry_cnt)
time.sleep(wait_time)
wait_time *= 2
retry_cnt += 1
def get_info(url):
"""
Get the CRC64 and total length of the file.
:param url: (required, string) the url address of the file.
:return: CRC64, length
"""
with retry(lambda : requests.get(url, {}, stream = True)) as r:
return r.headers['x-oss-hash-crc64ecma'], int(r.headers['content-length'])
class Response(object):
"""
The response class to support reading by chunks.
"""
def __init__(self, response):
self.response = response
self.status = response.status_code
self.headers = response.headers
def read(self, amt = None):
if amt is None:
content = b''
for chunk in self.response.iter_content(CHUNK_SIZE):
content += chunk
return content
else:
try:
return next(self.response.iter_content(amt))
except StopIteration:
return b''
def __iter__(self):
return self.response.iter_content(CHUNK_SIZE)
def do_subtask(event, context):
"""
Download a range of the file from url and then upload it to OSS.
:param event: (required, json) the json format of event.
:param context: (required, FCContext) the context of handler.
:return: ([integer, string]) the part_number and etag of this part.
"""
oss_endpoint = event['target_endpoint']
oss_bucket_name = event['target_bucket_name']
access_key_id = context.credentials.access_key_id
access_key_secret = context.credentials.access_key_secret
security_token = context.credentials.security_token
auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
object_name = event['target_object_name']
upload_id = event['upload_id']
part_number = event['part_number']
url = event['source_url']
st = event['st']
en = event['en']
try:
headers = {'Range' : 'bytes=%d-%d' % (st, en)}
resp = Response(retry(lambda : requests.get(url, headers = headers, stream = True)))
result = retry(lambda : bucket.upload_part(object_name, upload_id, part_number, resp))
return [part_number, result.etag]
except Exception as e:
print(traceback.format_exc())
raise Exception(e)
def invoke_subtask(args):
"""
Invoke the same function synchronously to start a subtask.
:param args: (object_name, upload_id, part_number, url, st, en, context)
:object_name: (required, string) the goal object_name.
:oss_endpoint: (required, string) the goal oss endpoint.
:oss_bucket_name: (required, string) the goal bucket_name.
:upload_id: (required, integer) the upload_id of this upload task.
:part_number: (integer) the part_number of the first part in this subtask.
:url: (required, string) the url address of the file.
:st, en: (required, integer) the byte range of this subtask, denoting [st, en].
:context: (required, FCContext) the context of handler.
:return: the return of the invoked function.
"""
object_name = args[0]
oss_endpoint = args[1]
oss_bucket_name = args[2]
upload_id = args[3]
part_number = args[4]
url = args[5]
st = args[6]
en = args[7]
context = args[8]
account_id = context.account_id
access_key_id = context.credentials.access_key_id
access_key_secret = context.credentials.access_key_secret
security_token = context.credentials.security_token
region = context.region
service_name = context.service.name
function_name = context.function.name
endpoint = 'http://%s.%s-internal.fc.aliyuncs.com' % (account_id, region)
client = fc2.Client(
endpoint = endpoint,
accessKeyID = access_key_id,
accessKeySecret = access_key_secret,
securityToken = security_token
)
payload = {
'target_object_name' : object_name,
'target_endpoint' : oss_endpoint,
'target_bucket_name' : oss_bucket_name,
'upload_id' : upload_id,
'part_number' : part_number,
'source_url' : url,
'st' : st,
'en' : en,
'is_children' : True
}
ret = retry(lambda : client.invoke_function(service_name, function_name, payload = json.dumps(payload)))
return ret.data
def migrate_file(url, oss_object_name, oss_endpoint, oss_bucket_name, context):
"""
Download the file from url and then upload it to OSS.
:param url: (required, string) the url address of the file.
:param oss_object_name: (required, string) the goal object_name.
:param oss_endpoint: (required, string) the goal oss endpoint.
:param oss_bucket_name: (required, string) the goal bucket_name.
:param context: (required, FCContext) the context of handler.
:return: actual_crc64, expect_crc64
:actual_crc64: (string) the CRC64 of upload.
:expect_crc64: (string) the CRC64 of source file.
"""
crc64, total_size = get_info(url)
access_key_id = context.credentials.access_key_id
access_key_secret = context.credentials.access_key_secret
security_token = context.credentials.security_token
auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
upload_id = retry(lambda : bucket.init_multipart_upload(oss_object_name)).upload_id
pool = Pool(MAX_SUBTASKS)
st = 0
part_number = 1
tasks = []
while st < total_size:
en = min(total_size - 1, st + BLOCK_SIZE - 1)
tasks.append((oss_object_name, oss_endpoint, oss_bucket_name, upload_id, part_number, url, st, en, context))
part_number += 1
st = en + 1
subtasks = pool.map(invoke_subtask, tasks)
pool.close()
pool.join()
parts = []
for it in subtasks:
tmp = json.loads(it)
parts.append(PartInfo(tmp[0], tmp[1]))
res = retry(lambda : bucket.complete_multipart_upload(oss_object_name, upload_id, parts))
return str(res.crc), str(crc64)
def get_oss_endpoint(oss_region, fc_region):
"""
Get the oss endpoint.
:param oss_region: (required, string) the region of the target oss.
:param fc_region: (required, string) the region of the fc function.
:return: (string) the best oss endpoint.
"""
endpoint = 'http://oss-' + oss_region
if oss_region == fc_region:
endpoint += '-internal'
return endpoint + '.aliyuncs.com'
def handler(event, context):
evt = json.loads(event)
if list(evt.keys()).count('is_children'):
return json.dumps(do_subtask(evt, context))
url = evt['source_url']
oss_object_name = evt['target_object_name']
oss_endpoint = get_oss_endpoint(evt['target_region'], context.region)
oss_bucket_name = evt['target_bucket_name']
st_time = int(time.time())
wait_time = SLEEP_TIME
retry_cnt = 1
while True:
actual_crc64, expect_crc64 = migrate_file(url, oss_object_name, oss_endpoint, oss_bucket_name, context)
if actual_crc64 == expect_crc64:
break
print('Migration object CRC64 not matched, expected: %s, actual: %s' % (expect_crc64, actual_crc64))
if retry_cnt > MAX_RETRY_TIME:
raise Exception('Maximum retry time exceeded.')
print('Retry %d times...' % retry_cnt)
time.sleep(wait_time)
wait_time *= 2
retry_cnt += 1
print('Success! Total time: %d s.' % (int(time.time()) - st_time))
3. 编写 event
点击 触发事件
,填入以下格式的 json:
{
"source_url" : "<source_url>",
"target_object_name" : "<target_object_name>",
"target_region" : "<target_region>",
"target_bucket_name" : "<target_bucket_name>"
}
4. 执行函数
点击 执行
,等待函数执行完毕,转存成功。
5. 查看转存文件
前往 OSS 控制台,在对应 bucket
下确认转存成功。