前言
Serverless 工作流(Serverless Workflow,原函数工作流)是一个用来协调多个分布式任务执行的全托管 Serverless 云服务,致力于简化开发和运行业务流程所需要的任务协调、状态管理以及错误处理等繁琐工作,让用户聚焦业务逻辑开发。用户可以用顺序、分支、并行等方式来编排分布式任务,服务会按照设定好的顺序可靠地协调任务执行,跟踪每个任务的状态转换,并在必要时执行用户定义的重试逻辑,以确保工作流顺利完成。
函数计算 FC 是事件驱动的全托管计算服务,无需采购服务器和运维,只需上传代码就能实现高可用、高并发、弹性伸缩的后端服务。
本文介绍如何使用 Serverless 工作流来高并发大批量的解冻 oss 归档存储文件,使用工作流的优势:
- 高并发
- 错误自动重试,高可靠性
- 每个步骤都有输入输出记录以及实时执行状态,高可观测性
应用中心一键部署
- 前往 Serverless 工作流应用中心 创建并部署
OSS Restore
应用。
-
部署完成后执行流程
{stackName}-mainRestoreFlow-{suffix}
,输入:{ "endpoint": "", "bucketName": "", "prefix": "", "marker": "", "maxKeys": 100, "pollInterval": 10, "workers": 10, "groupSize": 1 }
执行参数说明:
- endpoint
OSS endpoint - bucketName:
OSS bucket 名称 - prefix:
OSS bucket 文件过滤前缀 - maxKeys:
OSS ListObjects 返回的最大文件数量 (这里不要超过 foreach 的并发限制,默认是 100) - pollInterval:
轮询 OSS 文件 restore 状态的时间间隔(秒) - groupSize:
一个任务步骤(对应一个函数)批量处理的文件数 - workers:
多个文件在一个函数中处理时,设定的处理线程池大小 - marker:
OSS ListObjects 的开始 marker
详细信息可参考 ListObjects API
- endpoint
流程执行完毕后,会自动解冻所有符合过滤条件的归档存储文件。
原理
解冻文件主流程
主流程 mainRestoreFlow
主要做以下事情:
- 任务步骤
listArchiveFiles
调用 FC 函数listArchiveFiles
从marker
开始执行 ListObjects 列举指定maxKeys
数量,前缀为prefix
的 OSS 文件。 - 调用子流程
restoreFlow
对获取的文件列表进行解冻,并返回下一次列举的起点maker
。 - 选择步骤
checkEnd
检测是否已完成列举 bucket 中所有的文件,若没有,跳转到listArchiveFiles
中从下一个marker
开始继续执行,否则结束。
主流程定义,请参考 流程定义语言:
version: v1
type: flow
steps:
- type: pass
name: init
outputMappings:
- source: $input.endpoint
target: endpoint
- source: $input.bucketName
target: bucketName
- source: $input.prefix
target: prefix
- source: $input.maxKeys
target: maxKeys
- source: $input.pollInterval
target: pollInterval
- source: $input.marker
target: marker
- source: $input.workers
target: workers
- source: $input.groupSize
target: groupSize
# List archive files from marker
- type: task
name: listArchiveFiles
resourceArn: acs:fc:::services/<serviceName>/functions/listArchiveFiles
outputMappings:
- source: $local.bucketName
target: bucketName
- source: $local.filesGroup
target: filesGroup
- source: $local.marker
target: marker
- source: $local.end
target: end
- source: $local.empty
target: empty
- source: $local.archiveFilesCount
target: archiveFilesCount
# Check whether file restore success, if not, retry check
- type: choice
name: checkEmpty
choices:
# If list archive files not empty
- condition: $.empty == "false"
steps:
# Invoke subflow restore to restore listed files
- type: task
name: invokeRestoreFlow
resourceArn: acs:fnf:::flow/<restoreFlow>
pattern: sync
serviceParams:
Input: $
default:
goto: checkEnd
# Check list files ended
- type: choice
name: checkEnd
choices:
- condition: $.end == "true"
goto: success
default:
goto: listArchiveFiles
# success
- type: pass
name: success
解冻文件子流程
子流程 restoreFlow
主要做以下事情:
- 并行循环步骤 foreach,并行的对主流程中传入的文件列表生成多个解冻任务。
- 解冻任务
restoreTask
调用 FC 函数restore
对文件列表进行解冻。 -
所有解冻任务提交完成后,循环执行:
- 等待步骤
Wait
等待一段时间 - 任务步骤
GetJobStatus
获取所有文件的解冻状态 - 选择步骤
CheckJobComplete
判断是否全部解冻完成,若完成执行结束,否则跳转到 wait 步骤继续循环检测。
- 等待步骤
子流程定义如下:
version: v1
type: flow
steps:
- type: foreach
name: retoreForeach
iterationMapping:
collection: $.filesGroup
item: files
steps:
# Invoke restore function
- type: task
name: restoreTask
resourceArn: !Ref OSSRestoreService/restoreFunction
retry:
- errors:
- FC.ResourceThrottled
- FC.ResourceExhausted
- FC.InternalServerError
- FC.Unknown
- FnF.TaskTimeout
intervalSeconds: 1
maxAttempts: 10
multiplier: 1.5
maxIntervalSeconds: 10
# Wait interval for poll files restore status
- type: wait
name: Wait
duration: $.pollInterval
# Get file restore status
- type: task
name: GetJobStatus
resourceArn: !Ref OSSRestoreService/restoreStatusFunction
# Check whether file restore success, if not, retry check
- type: choice
name: CheckJobComplete
inputMappings:
- target: status
source: $local.status
choices:
- condition: $.status == "success"
goto: JobSucceeded
- condition: $.status == "running"
goto: Wait
- type: succeed
name: JobSucceeded
outputMappings:
- target: filesGroup
source: $input.filesGroup
- target: marker
source: $input.marker
以上应用代码可参考 oss-restore
总结
使用 Serverless 工作流能极大减少重复的流程控制开发,让错误重试更加容易,以及提供实时的执行进度查询。这里提供的解冻 oss 应用,完全可以照搬到其它的需求上,以实现高并发高可靠性的应用。
欢迎加钉钉群 23116481 交流: