Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

背景介绍

OSS 简单的接口和卓越的可扩展性让不同场景的应用程序每天可以轻松存储几个到几十亿个对象文件。简单的 key/value 数据访问结构极大地简化了数据的上传和读取。然而,除了上传和读取,很快就围绕 OSS 产生了一系列新的应用场景,举几个例子:

  • 海量 OSS 文件复制 (bucket 内或跨 bucket),改变存储类型(标准 -> 归档)节省成本
  • 大并发 OSS 文件解冻(restore)将备份的归档文件恢复后供应用使用
  • 事件触发超大文件解压,GB 级别压缩包,包含十万级别以上的文件需要在上传后被自动解压到一个新的 OSS 路径下

上面 3 类场景有一些有共性的挑战:

  1. 总处理时间长: 处理亿级别的 OSS 文件数即使是高并发访问 OSS, 总耗时也是天级别甚至更长
  2. 大量远程调用可能产生的异常处理:由于 OSS API 基本都是操作单个文件,处理几百万到几千万个文件就意味着等数量级的远程调用。在分布式系统中,这些远程调用失败都需要处理。
  3. 状态持久化:需要有类似 checkpoint 的机制减少在部分失败的情况下全部重新处理,避免浪费时间 (如批量处理可以跳过已经处理过的前 1000 万个 key)。

在这篇文章中我们将就上面提到的 3 个场景介绍一个基于 Serverless 工作流和函数计算(FC)无服务器最佳实践解决方案。

海量 OSS 文件复制 + 归档

OSS 文件备份听起来是一个简单的 list-and-copy 的主程序就可以搞定的问题,现实中有非常多的问题需要考虑:例如主程序运行中如果机器宕机或者进程异常退出后,如何自动恢复(自己实现高可用)?恢复后如何快速已经被处理过的文件 (自己写数据库维护状态)?如何协调主备进程 (自己实现服务发现)?如何缩短复制时间? (自己实现并行调用和管理) 基础设施的维护成本和经济成本和可靠性如何取舍?在几亿个 OSS 对象面前,一个简单的单线程 list-and-copy 的主程序已经无法可靠地满足这类需求。

假设用户某 bucket 下有几亿 OSS 文件需要被复制到同一 region 不同 bucket 下,且需要将标准存储类型转换成归档存储。 在这个 oss-batch-copy 示例 中,我们提供了一个工作流应用模板将用户提供的索引文件中的所有文件依次调用函数计算 OSS CopyObject 操作实现备份。索引文件中包含需要被处理的 OSS object meta,示例如下:

Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

几亿个 OSS 文件的索引也会在百 GB 级别,因此需要利用 range 读分页读取索引文件,每次处理一部分 OSS 文件并且需要一个类似 while hasMore {} 的控制逻辑保证整个索引文件从头到尾被处理。使用 Serverless 工作流的实现逻辑如下:

  1. copy_files 任务步骤:从输入的索引文件位置 (offset) 读取一段输入提供的长度 (size)从中提取需要被处理的文件并调用 FC 函数调用 OSS CopyObject
  2. has_more_files 选择步骤:成功处理完一批文件后,通过条件比对判断当前索引文件是否已经被全部处理,是则进入成功步骤,否则将下一页(offset, size)传入 copy_files 循环执行。
  3. start_sub_flow_execution 任务步骤:由于单个工作流执行有历史事件(history events)数限制,在该选择步骤也会根据当前工作流的事件 ID 判断,如果当前事件数已经超过一个阈值,则触发一个新的相同的流程执行,该流程会在子流程结束后继续进行。子流程也可能触发它的子流程,这样层层递归保证了无论多少 OSS 文件,整个流程都可以处理完成。

Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

使用工作流实现批量处理提供了如下保证:

  1. 单次处理的时间几乎可以任意长度,任意多的文件数:工作流支持最长 1 年的执行
  2. 免运维,无需自行实现高可用:工作流和函数计算都是高可用的 Serverless 云服务
  3. 无需自己实现 checkpoint, 状态维护等逻辑:如果因为任何原因流程失败,可以重新从最近成功的一个 offset 开始执行。这过程中不需要使用任何的数据库或者队列。
  4. 失败重试配置:通过指数退避的配置可以处理大多数的瞬时远程调用错误。

高并发批量解冻 OSS 文件

文章基于 Serverless 工作流高并发批量解冻 OSS 文件介绍了一种高效可靠解冻大量 OSS 归档文件的解决方案。该场景和复制文件有类似的挑战,但也有其特殊性:

  1. 和 CopyObject 不同,Restore 操作是异步的,即触发后需要轮询该对象状态才能能确保解冻完成
  2. 单个对象解冻时间在分钟级,可能随着对象大小变化。这要求整个流程有更高的并发保证解冻在规定的时间内完成。

和 oss-batch-copy 类似的逻辑,该示例通过 ListObjects 分批 restore OSS,其中每一批解冻都是一个子流程。在每个流程中使用 foreach 并行循环步骤 高并发解冻 OSS 对象(最高 100 并发)。由于 Restore 接口是异步操作,因此在每次 Restore 过后需要轮询该 object 的状态直到解冻完成。解冻和轮询在同一个并发分支中完成。

Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

使用工作流+函数计算批量解冻的特点:

  1. 天然支持高并发解冻,缩短整体耗时
  2. 有状态可靠的轮询确保流程结束时所有对象都解冻完成

事件驱动解压超大 OSS 文件

OSS 的一大作用是做文件的共享存储,如一方上传处理好的内容供下游应用使用。由于多个文件上传需要调用多次 PutObject 接口出错概率较大也不容易实现,很多上游服务使用压缩包用一个接口调用就完成上传。这样虽然简化了上传方,然而下游的使用方希望看到的是维持原本结构的文件以便消费。这里的需求就是响应 OSS 文件上传事件,自动将一个压缩包解压存放到另一个 OSS 路径。今天控制台上已经有一个通过事件触发函数计算执行解压的功能,然而目前单纯基于函数计算的方案有一些问题:

  1. 单个函数 10 分钟的执行时间限制:对于 GB 级别的压缩包,或者压缩包内有海量小文件的场景很容易执行超时导致解压失败
  2. 容错性低:OSS 异步调用函数计算,函数内访问 OSS 存在瞬时失败的可能,FC 异步调用在函数调用失败时会最多重试 3 次,超过次数后会丢弃消息导致解压失败。
  3. 灵活性不够:多个用户提出需要在解压后向消息服务发通知,发短信,以及删除原压缩包等自定义需求,基于单函数不易实现。

为了解决长时间执行和自定义重试的问题,我们在这个示例应用中引入 Serverless 工作流编排函数计算。OSS 事件触发函数计算后启动工作流执行。工作流会通过 ZIP 包的元数据流式读取、解压、上传到 OSS 目标路径。每个函数的执行时间超过一定阈值后即返回当前的 marker,工作流会判断当前 marker 是否表示所有文件处理完成,如果是则结束流程执行,否则继续从当前 marker 继续流式解压,直到结束。

Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

工作流的加入突破了函数调用10分钟的限制,并且自带的状态管理和自定义重试,即使是 GB 级别大小,10万级别文件数的压缩包也可以可靠地解压。工作流最长支持一年的执行,几乎任意大小的 ZIP 包都同样可以流式解压成功。

Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

工作流为解压流程提供了灵活的定制化能力,下图是某用户的在解压结束后通知其 MNS 队列,通知结束后进入到下一步骤删除原压缩包。

Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

Takeaways

可以看到 OSS 的大规模普及也带来了一系列问题,然而解决问题的方式又繁琐,无趣,易出错。本文我们就批量文件备份,大并发解冻和事件驱动解压超大 ZIP 文件 3 个常见场景介绍了基于 Serverless 工作流和函数计算的简单、轻量,Serverless 解决方案高效可靠地解决以下问题:

  1. 支持长时间运行的流程,最长执行一年不间断
  2. 状态维护,不受系统 failover 影响
  3. 提高瞬时错误忍度度
  4. 高度灵活自定义

海量 OSS 文件批量处理的场景远不止文中提到的 3 个,我们期待更多的场景和需求的讨论也同样欢迎对 Serverless 生态,工作流,函数计算有兴趣的同学加入内部钉钉群。

Serverless 工作流 + 函数计算批量处理海量 OSS 文件最佳实践

上一篇:大数据解读《旅行青蛙》崛起之谜


下一篇:函数计算支持容器镜像-加速应用 Serverless 进程