本文作者:CHEN, Yingda 阿里云智能 高级技术专家
0. 前言
作为阿里巴巴核心大数据底座,伏羲调度和分布式执行系统,支撑着阿里集团内部以及阿里云上大数据平台绝大部分的大数据计算需求,在其上运行的MaxCompute(ODPS) 以及PAI等多种计算引擎,每天为用户进行海量的数据运算。 在"阿里体量"的大数据生态中,伏羲系统管理着弹内外多个物理集群,超十万台物理机, 以及数百万的CPU/GPU cores。每天运行在伏羲分布式平台上的作业数已经超过千万, 是业界少有的,单天处理EB级别数据分布式平台。其中单个作业规模已经高达数十万计算节点,管理着数百亿的边连接。在过去的十年中,阿里集团以及阿里云上这样的作业数目和规模,锤炼了伏羲分布式平台;与此同时,今天平台上作业的日益多样化,以及向前再发展的需求,对于伏羲系统架构的进一步演化,也都带来了巨大挑战与机遇。本文主要介绍一下在过去的两年多时间中,阿里巴巴伏羲团队对于整个核心调度与分布式执行系统的升级换代,code name DAG 2.0。
1. 背景
1.1 伏羲 DAG/AM 组件
从较高的层面来看整个分布式系统的体系架构,物理集群之上运行的分布式系统,大概可以分成资源管理,作业分布式调度执行,与多个计算节点的运行这三个层次,如同下图所示。通常所说的DAG组件,指的是每个分布式作业的中心管理点,也就是application master (AM)。 AM之所以经常被称为DAG (Directional Acyclic Graph, 有向无环图) 组件,是因为AM最重要的责任,就是负责协调分布式作业的执行。而现代的分布式系统中的作业执行流程,通常可以通过DAG上面的调度以及数据流来描述[1]。相对于传统的Map-Reduce[2]执行模式, DAG的模型能对分布式作业做更精准的描述,也是当今各种主流大数据系统(Hadoop 2.0+, SPARK, FLINK, TENSORFLOW等)的设计架构基础,区别只在于DAG的语义是透露给终端用户,还是计算引擎开发者。
与此同时,从整个分布式系统stack来看, AM肩负着除了运行DAG以外更多的责任。作为作业的中心管控节点,向下其负责与Resource Manager之间的交互,为分布式作业申请计算资源;向上其负责与计算引擎进行交互,并将收集的信息反馈到DAG的执行过程中。作为唯一有能力对每一个分布式作业的执行大局有最精准的了解的组件,在全局上对DAG的运行做准确的管控和调整,也是AM的重要职责。从上图描述的分布式系统stack图中,我们也可以很直观的看出,AM是系统中唯一需要和几乎所有分布式组件交互的组件,在作业的运行中起了重要的承上启下的作用。这一组件之前在伏羲系统中被称为JobMaster(JM), 在本文中我们统一用DAG或者AM来指代。
1.2 逻辑图与物理图
分布式作业的DAG,有两种层面上的表述:逻辑图与物理图。简单地来说(over-simplified),终端用户平时理解的DAG拓扑,大多数情况下描述的是逻辑图范畴:比如大家平时看到的logview图,虽然里面包含了一些物理信息(每个逻辑节点的并发度),但整体上可以认为描述的就是作业执行流程的逻辑图。
准确一点说:
- 逻辑图描述了用户想要实现的数据处理流程,从数据库/SQL的角度(其他类型引擎也都有类似之处,比如TENSORFLOW) 来看,可以大体认为DAG的逻辑图,是对优化器执行计划的一个延续。
- 物理图更多描述了执行计划映射到物理分布式集群的具体描述,体现的是执行计划被物化到分布式系统上,具备的一些特性:比如并发度,数据传输方式等等。
而每个逻辑图的"物理化",可以有很多等效方式。选择合适的方式来将逻辑图变成物理化执行,并进行灵活的调整,是DAG组件的重要职责之一。从上图的逻辑图到物理图的映射可以看到,一个图的物理化过程,实际上就是在回答一系列图节点以及各个连接边物理特性的问题,一旦这些问题得到确认,就能得到在分布式系统上实际执行物理图。
1.3. 为什么需要DAG 2.0架构升级?
作为从阿里云飞天系统创建伊始就开始研发的伏羲分布式作业执行框架,DAG 1.0在在过去十年中支撑了阿里集团的大数据业务,在系统规模以及可靠性等方面都走在了业界领先。另外一方面,作为一个开发了十年的系统,虽然在这个期间不断的演进,DAG 1.0在基本架构上秉承了比较明显的Map-Reduce执行框架的一些特点,逻辑图和物理图之间没有清晰的分层,这导致在这个基本架构上要继续向前走,支持更多DAG执行过程中的动态性,以及同时支持多种计算模式等方面,都比较困难。事实上今天在MaxCompute SQL线上,离线作业模式以及准实时作业模式(smode)两种执行模式,使用了两套完全分开的分布式执行框架,这也导致对于优化性能和优化系统资源使用之间的取舍,很多情况下只能走两个极端,而无法比较好的tradeoff。
除此之外,随着MaxCompute以及PAI引擎的更新换代以及新功能演进,上层的分布式计算自身能力在不断的增强。对于AM组件在作业管理,DAG执行等方面的动态性,灵活性等方面的需求也日益强烈。在这样的一个大的背景下,为了支撑计算平台下个10年的发展,伏羲团队启动了DAG 2.0的项目,将从代码和功能方面,完整替代1.0的JobMaster组件,实现完全的升级换代。在更好的支撑上层计算需求的同时,也同时对接伏羲团队在shuffle服务(shuffle service)上的升级,以及fuxi master(Resource Manager) 的功能升级。与此同时,站在提供企业化服务的角度来看,一个好的分布式执行框架,除了支持阿里内部极致的大规模大吞吐作业之外,我们需要支持计算平台的向外走,支持云上各种规模和计算模式的需求。除了继续锤炼超大规模的系统扩展能力意外,我们需要降低大数据系统使用的门槛,通过系统本身的智能动态化能力,来提供自适应(各种数据规模以及处理模式)的大数据企业界服务,是DAG 2.0在设计架构中考虑的另一重要维度。
2. DAG 2.0架构以及整体设计
DAG 2.0项目,在调研了业界各个分布式系统(包括SPARK/FLINK/Dryad/Tez/Tensorlow)DAG组件之后,参考了Dryad/Tez的框架。新一代的架构上,通过逻辑图和物理图的清晰分层,可扩展的状态机管理,插件式的系统管理,以及基于事件驱动的调度策略等基座设计,实现了对计算平台上多种计算模式的统一管理,并更好的提供了作业执行过程中在不同层面上的动态调整能力。
2.1 作业执行的动态性
传统的分布式作业执行流程,作业的执行计划是在提交之前确定的。以SQL执行为例,一个SQL语句,在经过编译器和优化器后产生执行图,并被转换成分布式系统(伏羲)的执行计划。
这个作业流程在大数据系统中是比较标准的操作。然而在具体实现中,如果在DAG的执行缺乏自适应动态调整能力的话,整个执行计划都需要事先确定,会使得作业的运行没有太多动态调整的空间。放在DAG的逻辑图与物理图的背景中来说,这要求框架在运行作业前,必须事先了解作业逻辑和处理数据各种特性,并能够准确回答作业运行过程,各个节点和连接边的物理特性问题,来实现逻辑图往物理图的转换。
然而在现实情况中,许多物理特性相关的问题,在作业运行前是无法被感知的。以数据特性为例,一个分布式作业在运行前,能够获得的只有原始输入的一些特性(数据量等), 对于一个较深的DAG执行而言,这也就意味着只有根节点的物理计划(并发度选择等) 是相对合理的,而下游的节点和边的物理特性只能通过一些特定的规则来猜测。虽然在输入数据有丰富的statistics的前提下,优化器有可能可以将这些statistics,与执行plan中的各个operator特性结合起来,进行一些适度的演算:从而推断在整个执行流程中,每一步产生的中间数据可能符合什么样的特性。但这种推断在实现上,尤其在面对阿里大体量的实际生产环境中,面临着巨大的挑战,例如:
- 实际输入数据的statistics的缺失:即便是SQL作业处理的结构化数据,也无法保证其源表数据特性拥有很好的统计。事实上今天因为数据落盘方式多样化,以及精细化统计方式的缺失,大部分的源表数据都是没有完整的statistics的。此外对于集群内部和外部需要处理的非结构化数据,数据的特性的统计更加困难。
- 分布式作业中存在的大量用户逻辑黑盒:作为一个通用的大数据处理系统,不可避免的需要支持用户逻辑在系统中的运行。比如SQL中常用的UDF/UDTF/UDJ/Extractor/Outputer等等,这些使用Java/Python实现的用户逻辑,计算引擎和分布式系统并无法理解,在整个作业流程中是类似黑盒的存在。以MaxCompute为例,线上有超过20%的 SQL作业,尤其是重点基线作业,都包含用户代码。这些大量用户代码的存在,也造成了优化器在很多情况下无法对中间产出数据的特性进行预判。
-
优化器预判错误代价昂贵:在优化器选择执行计划时,会有一些优化方法,在数据符合一定特殊特性的时候,被合理选中能带来性能优化。但是一旦选择的前提假设错误(比如数据特性不符合预期),会适得其反,甚至带来严重的性能回退或作业失败。在这种前提下,依据静态的信息实现进行过多的预测经常得不到理想的结果。
这种种原因造成的作业运行过程中的非确定性,要求一个好的分布式作业执行系统,需要能够根据中间运行结果的特点,来进行执行过程中的动态调整。因为只有在中间数据已经在执行过程中产生后,其数据特性才能被最准确的获得,动态性的缺失,可能带来一系列的线上问题,比如:
- 物理资源的浪费:比如计算节点事先选择的资源类型的不合理,或者大量的计算被消耗用于处理后继会被丢弃的无效数据。
- 作业的严重长尾:比如中间数据分布倾斜或不合理编排,导致一个stage上计算节点需要处理的数据量极端化。
-
作业的不稳定:比如由于优化器静态计划的错判,导致不合理的执行计划无法完成
而DAG/AM作为分布式作业唯一的中心节点和调度管控节点,是唯一有能力收集并聚合相关数据信息,并基于这些数据特性来做作业执行的动态调整,的分布式组件。这包括简单的物理执行图调整(比如动态的并发度调整),也包括复杂一点的调整比如对shuffle方式和数据编排方式重组。除此以外,数据的不同特点也会带来逻辑执行图调整的需求:对于逻辑图的动态调整,在分布式作业处理中是一个全新的方向,也是我们在DAG 2.0里面探索的新式解决方案。
点,边,图的清晰物理逻辑分层,和基于事件的数据收集和调度管理,以及插件式的功能实现,方便了DAG 2.0在运行期间的数据收集,以及使用这些数据来系统性地回答,逻辑图向物理图转化过程中需要确定的问题。从而在必要的时候实现物理图和逻辑图的双重动态性,对执行计划进行合理的调整。在下文中提到几个落地场景中,我们会进一步举例说明基于2.0的这种强动态性能力,实现更加自适应,更加高效的分布式作业的执行。
2.2 统一的AM/DAG执行框架
DAG 2.0抽象分层的点,边,图架构上,也使其能通过对点和边上不同物理特性的描述,对接不同的计算模式。业界各种分布式数据处理引擎,包括SPARK, FLINK, HIVE, SCOPE, TENSORFLOW等等,其分布式执行框架的本源都可以归结于Dryad[1]提出的DAG模型。我们认为对于图的抽象分层描述,将允许在同一个DAG系统中,对于离线/实时/流/渐进计算等多种模型都可以有一个好的描述。在DAG 2.0初步落地的过程中,首要目标是在同一套代码和架构系统上,统一当前伏羲平台上运行的几种计算模式,包括MaxCompute的离线作业,准实时作业,以及PAI平台上的Tensorflow作业和其他的非SQL类作业。对更多新颖计算模式的探索,也会有计划的分步骤进行。
2.2.1 统一的离线作业与准实时作业执行框架
首先我们来看平台上作业数占到绝大多数的SQL线离线作业(batch job)与准实时作业(smode)。前面提到过,由于种种历史原因,之前MaxCompompute SQL线的这两种模式的资源管理和作业执行,是搭建在两套完全分开的代码实现上的。这除了导致两套代码和功能无法复用以外,两种计算模式的非黑即白,使得彼此在资源利用率和执行性能之间无法tradeoff。而在2.0的DAG模型上,我们实现了这两种计算模式比较自然的融合和统一,如下图所示,
在通过对逻辑节点和逻辑边上映射不同的物理特性,离线作业和准实时作业都能得到准确的描述:
- 离线作业:每个节点按需去申请资源,一个逻辑节点代表一个调度单位;节点间连接边上传输的数据,通过落盘的方式来保证可靠性;
- 准实时作业:整个作业的所有节点都统一在一个调度单位内进行gang scheduling;节点间连接边上通过网络/内存直连传输数据,并利用数据pipeline来追求最优的性能。
今天在线上,离线模式因为其on-demand的资源申请以及中间数据落盘等特点,作业在资源利用率,规模性和稳定性方面都有明显的优势。而准实时模式则通过常驻的计算资源池以及gang scheduling这种greedy资源申请,降低了作业运行过程中的overhead,并使得数据的pipelined传输处理成为可能,达到加速作业运行的效果,但其资源使用的特点,也使其无法在广泛范围内来支持大规模作业。DAG 2.0的升级,不仅在同一套架构上统一了这两种计算模式,更重要的是这种统一的描述方式,使得探索离线作业高资源利用率,以及准实时作业的高性能之间的tradeoff成为可能:当调度单位可以*调整,就可以实现一种全新的混合的计算模式,我们称之为Bubble执行模式。
这种混合Bubble模式,使得DAG的用户,也就是上层计算引擎的开发者(比如MaxCompute的优化器),能够结合执行计划的特点,以及引擎终端用户对资源使用和性能的敏感度,来灵活选择在执行计划中切出Bubble子图。在Bubble内部充分利用网络直连和计算节点预热等方式提升性能,没有切入Bubble的节点则依然通过传统离线作业模式运行。回过头来看,现有的离线作业模式和准实时作业模式,分别可以被描述成Bubble执行模式的两个极端特例,而在统一的新模型之上,计算引擎和执行框架可以在两个极端之间,根据具体需要,选择不同的平衡点,典型的几个应用场景包括:
- Greedy Bubble:在可用的资源(集群规模,quota等)受限,一个大规模作业无法实现gang scheduling时,如果用户对资源利用率不敏感,唯一的目标是尽快跑完一个大规模作业。这种情况下,可以实现基于可用计算节点数目,实施greedy的bubble切割的策略, 尽量切出大的bubble。
- Efficient Bubble:在作业的运行过程中,节点间的运算可能存在天然的barrier (比如sort运算, 建hash表等等)。如果把两个通过barrier边连接的节点切到一个bubble中,虽然作业e2e性能上还是会有调度overhead降低等带来的提升,但是因为数据无法完全pipeline起来,资源的利用率达不到最高。 那么在对资源的利用率较为敏感时,可以避免bubble内部出现barrier边。这同样是计算引擎可以根据执行计划做出决定的。
这里只列举了两个简单的策略,其中还有更多可以细化以及针对性优化的地方。在不同的场景上,通过DAG层面提供的这种灵活按照bubble执行计算的能力,允许上层计算可以在不同场景上挑选合适的策略,更好的支持各种不同计算的需求。
2.2.2 支持新型计算模式的描述
1.0的执行框架的底层设计受Map-Reduce模式的影响较深,节点之间的边连接,同时混合了调度顺序,运行顺序,以及数据流动的多种语义。通过一条边连接的两个节点,下游节点必须在上游节点运行结束,退出,并产生数据后才能被调度。这种描述对于新型的一些计算模式并不适用。比如对于Parameter Server计算模式,Parameter Server(PS)与Worker在运行过程中有如下特点:
- PS作为parameter的serving entity, 可以独立运行
- Worker作为parameter的consumer和updater, 需要PS在运行后才能有效的运行,并且在运行过程中需要和PS持续的进行数据交互
这种运行模式下,PS和worker之间天然存在着调度上的前后依赖关系。但是因为PS与worker必须同时运行,不存在PS先退出worker才调度的逻辑。所以在1.0框架上, PS 与 worker只能作为两个孤立无联系的stage来分开调度和运行。此外所有PS与worker之间,也只能完全通过计算节点间直连通讯,以及在外部entity (比如zookeeper或nuwa)协助来进行沟通与协调。这导致AM/DAG作为中心管理节点作用的缺失,作业的管理基本被下放计算引擎上,由计算节点之间自行试图协调来完成。这种无中心化的管理,对稍微复杂的情况下(failover等)无法很好的处理。
在DAG 2.0的框架上,为了更准确的描述节点之间的调度和运行关系,引入并且实现了concurrent edge的概念:通过concurrent edge连接的上下游节点,在调度上存在先后,但是可以同时运行。而调度的时机也可以灵活配置:可以上下游同步调度,也可以在上游运行到一定程度后,通过事件来触发下游的调度。在这种灵活的描述能力上,PS作业可以通过如下这种DAG来描述,这不仅使得作业节点间的关系描述更加准确,而且使得AM能够理解作业的拓扑,进行更加有效的作业管理,包括在不同计算节点发生failover时不同的处理策略等。
此外,DAG 2.0新的描述模型,也允许PAI平台上的Tensorflow/PS作业实现更多的动态优化,并进行新的创新性工作。在上图的dynamic PS DAG中,就引进了一个额外的control 节点,这一节点可以在作业运行过程中(包括PS workload运行之前和之后),对作业的资源申请,并发度等进行动态的调整,确保作业的优化执行。
事实上concurrent edge这个概念,描述的是上下游节点运行/调度时机的物理特性,也是我们在清晰的逻辑物理分层的架构上实现的一个重要扩展。不仅对于PS作业模式,在之前描述过的对于通过bubble来统一离线与准实时作业计算模式,这个概念也有重要的作用。
3. DAG 2.0与上层计算引擎的集成
DAG 2.0作为计算平台的分布式运行基座,它的升级换代,为上层的各种计算引擎提供了更多灵活高效的执行能力,而这些能力的落地,需要通过与具体计算场景的紧密结合来实现。接下来通过2.0与上层各个计算引擎(包括MaxCompute以及PAI平台等)的一些对接场景,具体举例说明2.0新的调度执行框架,如何赋能平台上层的计算与应用。
3.1 运行过程中的DAG动态调整
作为计算平台上的作业大户,MaxCompute平台上多种多样的计算场景,尤其是离线作业中的各种复杂逻辑,为动态图能力的落地提供了丰富多样的场景,这里从动态物理图和逻辑图几个方面讨论几个例子。
3.1.1 动态并发度调整
基于作业运行期间中间数据大小进行动态并发度调整,是DAG动态调整中最基本的能力。以传统MR作业为例,对于一个静态MR作业而言,能根据读取数据量来比较准确判断Mapper的并发,但是对于Reducer的并发只能简单推测,比如下图中对于处理1TB的MR作业而言,提交作业时,只能根据Mapper 1000并发,来猜测给出500的Reducer并发度,而如果数据在Mapper经过大量过滤导致最终之产出10MB中间数据时,500并发度Redcuer显然是非常浪费的,动态的DAG必须能够根据实际的Mapper产出来进行Reducer并发调整(500->1)。
而实际实现中,最简单的动态调整,会直接按照并发度调整比例来聚合上游输出的partition数据,如下图这个并发度从10调整到5的例子所示,在调整的过程中,可能产生不必要的数据倾斜。
DAG 2.0基于中间数据的动态并发调整实现,充分考虑了数据partition可能存在倾斜的情况,对动态调整的策略进行了优化,使得动态调整的策略后数据的分布更加均匀,可以有效避免由于动态调整可能引入的数据倾斜。
这种最常见下游并发调整方式是DAG 2.0动态物理图能力的一个直观展示。在2.0中项目中,结合计算引擎的数据处理的特点,还探索了基于源数据的动态并发调整。例如对于最常见的两个原表数据的join (M1 join M2 at J), 如果用节点大小来表示其处理数据的的多少,那对于下图这么一个例子,M1处理的是中等的一个数据表(假设M1需要并发度为10),M2处理的是较大的数据表(并发度为1000),naïve的执行方式会将按照 10 + 1000的并发度调度,同时因为M2输出需要全量shuffle到J, J需要的并发度也会较大 (~1000).
而实际上,对于这种计算pattern而言,M2需要读取(并进行处理)的,应该只有能和M1的输出join得上的数据,也就是说在考虑了整体执行cost后,在这种M1期望的输出数据要比M2小的多的情况下,可以先行调度M1完成计算,将M1输出数据的statistics在AM/DAG端进行聚合,然后只挑选出M2的有效数据进行处理。这里"M2的有效数据"的选择本质上是一个predicate push down的过程,可以由计算引擎的优化器和运行时联合进行判断。也就是说,这种情况下M2的并发度调整,是和上层计算紧密结合的。
一个最直观的例子是,如果M2是一个1000个分区的分区表,并且分区的key和join的key相同,那么可以只读取M2能和M1输出join上的有效数据的partition进行读取处理。假如M1的输出只包含了M2原表数据的3个partition keys, 那么在M2就只需要调度3个计算节点来处理这3个分区的数据。也就是说M2的并发度从默认的1000,可以降低到3,这在保证同样的逻辑计算等效性与正确性的前提下,能大大降低计算资源的消耗,并数倍加速作业的运行。这里的优化来自与几个方面:
- M2的并发度(1000->3)以及处理的数据量大大降低
- M2需要shuffle到J的数据量以及shuffle需要的计算量大大降低
- J 需要处理的数据量以及其并发度能大大降低
从上图这个例子中我们也可以看到,为了保证M1->M2的调度顺序上,DAG中在M1和M2间引入了一条依赖边,而这条边上是没有数据流动的,是一条只表示执行先后的依赖边。这与传统MR/DAG执行框架里,边的连接与数据流动紧绑定的假设也有不同,是在DAG 2.0中对于边概念的一个拓展之一。
DAG执行引擎作为底层分布式调度执行框架,其直接的对接"用户" 是上层计算引擎的开发团队,其升级对于终端用户除了性能上的提升,直接的体感可能会少一点。这里我们举一个终端用户体感较强的具体例子,来展示DAG更加动态的执行能力,能够给终端用户带来的直接好处。就是在DAG动态能力的基础上,实现的LIMIT的优化。
对于SQL用户来说,对数据进行一些基本的at hoc操作,了解数据表的特性,一个非常常见的操作是LIMIT,比如:
SELECT * FROM tpch_lineitem WHERE l_orderkey > 0 LIMIT 5;
在分布式执行框架上,这个操作对应的执行计划,是通过将源表做切分后,然后调度起所需数目的mapper去读取全部数据,再将mapper的输出汇总到reducer后去做最后的LIMIT截断操作。假设源表(这里的tpch_lineitem)是一个很大的表,需要1000个mapper才能读取,那么在整个分布式执行过程中,涉及的调度代价就是要调度1000 mapper + 1 reducer。这个过程中会有一些上层计算引擎可以优化的地方,比如每个mapper可以最多输出LIMIT需要的record数目(这里的LIMIT 5)提前退出,而不必处理完所有分配给它的数据分片等等。但是在一个静态的执行框架上,为了获取这样简单的信息,整体1001个计算节点的调度无法避免。这给这种ad hoc query执行,带来了巨大的overhead, 在集群资源紧张的时候尤其明显。
DAG 2.0上, 针对这种LIMIT的场景,依托新执行框架的动态能力,实现了一些优化,这主要包括几方面:
- 上游Exponential start: 对于这种大概率下上游mapper计算节点不需要全部运行的情况,DAG框架将对mapper进行指数型的分批调度,也就是调度按照1, 10 ... FULL的分批执行
- 下游的Early scheduling: 上游产生的record数目作为执行过程中的统计数据上报给AM, AM在判断上游已经产生足够的record条数后,则提前调度下游reducer来消费上游的数据。
- 上游的Early termination: 下游reducer在判断最终输出的LIMIT条数已经满足条件后,直接退出。这时候AM可以触发上游mapper整个逻辑节点的提前退出(在这种情况下,大部分mapper可能都还没有调度起来),整个作业也能提前完成。
这种计算引擎和DAG在执行过程中的灵活动态交互,能够带来大量的资源节省,以及加速作业的执行。在线下测试和实际上线效果上,基本上绝大多数作业在mapper执行完1个计算节点后就能提前退出,而无需全量调起(1000 vs 1)。
下图是在线下测试中,当mapper并发为4000时,上述query优化前后的区别:
可以看到,执行时间优化后增速了5X+, 计算资源的消耗更是减小了数百倍。
这个线下测试结果作为比较典型的例子,稍微有些理想化。为了评估真实的效果,在DAG 2.0上线后,选取了LIMIT优化生效的线上作业,统计了一星期结果如下:这个优化平均为每个作业节省了(254.5 cores x min CPU + 207.3 GB x min) 的计算资源,同时每个作业上,平均能节省4349个(无效)计算节点的调度。
LIMIT执行上的改进,作为一个针对特殊场景上实现的优化,涉及了整个DAG执行不同策略的调整,这种细化的改进能力,能更直观的体现DAG 2.0架构升级诸多好处:灵活的架构使得DAG的执行中拥有了更多的动态调整能力,也能和计算引擎在一起进行更多有针对性的优化。
不同情况下的动态并发度调整,以及具体调度执行策略的动态调整,只是图的物理特性动态调整的几个例子。事实上对于物理特性运行时的调整,在2.0的框架之上有各种各样的应用,比如通过动态数据编排/shuffle来解决各种运行期间的skew问题等,这里不再做进一步的展开。接下来我们再来看看DAG 2.0上对于逻辑图的动态调整做的一些探索。
3.1.2 动态逻辑图的调整
分布式SQL中,map join是一个比较常见的优化,其实现原理是在join的两个表中,如果有一个超小的表(可以 fit 到单个计算节点的内存中),那对于这个超小表可以不做shuffle,而是直接将其全量数据broadcast到每个处理大表的分布式计算节点上。通过在内存中直接建立hash表,完成join操作。map join优化能大量减少 (大表) shuffle和排序,非常明显的提升作业运行性能。但是其局限性也同样显著:如果"超小表"实际不小,无法fit进单机内存,那么在试图建立内存中的hash表时就会因为OOM而导致整个分布式作业的失败,而需要重跑。所以虽然map join在正确使用时,可以带来较大的性能提升,但实际上优化器在产生map join的plan时需要偏保守,很多情况下需要用户显式的提供map join hint来产生这种优化。此外不管是用户还是优化器的选择,对于非源表的输入都无法做很好的判断,因为中间数据的大小往往需要在作业运行过程中才能准确得知。
而map join与默认join方式(sorted merge join)对应的其实是两种不同优化器执行计划,在DAG层面,其对应的是两种不同的逻辑图。要支持这种运行过程中根据中间数据特性的动态优化,就需要DAG框架具备动态逻辑图的执行能力,这也是在DAG 2.0上开发的conditional join功能。如同下图展示,在对于join使用的算法无法被事先确定的时候,允许优化器提供一个conditional DAG,这样的DAG同时包括使用两种不同join的方式对应的不同执行计划支路。在实际执行时,AM根据上游产出数据量,动态选择一条支路执行(plan A or plan B)。这样子的动态逻辑图执行流程,能够保证每次作业运行时都能根据实际作业数据特性,选择最优的执行计划。
conditional join是动态逻辑图的第一个落地场景,在线上选择一批适用性作业,动态的conditional join相比静态的执行计划,整体获得了将近3X的性能提升。
3.2 混合Bubble模式
Bubble模式是我们在DAG 2.0架构上探索的一种全新的作业运行方式,通过对于bubble大小以及位置的调整,可以获取性能和资源利用率的不同tradeoff点。这里通过一些更加直观的例子,来帮助大家理解Bubble执行在分布式作业中的实际应用。
在上图的TPCH Q21上。比如在Q21上,我们看到了通过将作业被切分为三个"bubble",数据能够有效的在节点之间pipeline起来,并且通过热点节点实现调度的加速。最终消耗的资源数(cpu * time) 是准实时作业的35%, 而性能则与一体化调度的准实时作业非常相近(96%), 比离线作业性能提升70%左右。
在标准TPCH 1TB全量测试中,混合bubble模式体现出了相比离线和准实时的一体化模式(gang scheduling)更好的资源/性能 tradeoff。选用Greedy Bubble(size = 500)的策略,bubble相比离线作业性能提升了2X(资源消耗仅增加17%,具体数值略)。同时与一体化调度的准实时作业比较,bubble执行在只消耗了40%不到的资源(cpu * time) 的前提下,其性能达到了准实时作业的85%(具体数值略)。可以看到,这种新型的bubble执行模式,允许我们在实际应用中获取很好的性能与资源的平衡,达到系统资源有效的利用。Bubble执行模式目前正在阿里集团内部全量上线中,我们在实际线上的作业也看到了与TPCH测试非常相似的效果。
如同之前所述,混合bubble模式支持了不同切分策略,这里提供的只是一种切分策略上的效果。在与上层计算引擎(e.g., MaxCompute 优化器)紧密结合时,这种DAG分布式调度bubble执行的能力,能够允许我们根据可用资源和作业计算特点,来寻找性能与资源利用率的最佳平衡点。
4. 资源的动态配置和动态管理
传统分布式作业对于每个计算节点需要的资源类型(CPU/GPU/Memory)和大小都是预先确定下来的。然而在分布式作业,在作业运行之前,对计算节点资源类型和大小的合理选择,是比较困难的。即便对于计算引擎的开发者,也需要通过一些比较复杂的规则,才能预估出大概合理的配置。而对于需要将这些配置透明给终端用户的计算模式,终端用户要做出选择就更加困难。
在这里以PAI的Tensorflow(TF)作业为例,描述DAG 2.0的资源动态配置能力,怎样帮助平台的TF作业选择合理的GPU类型资源以及提高GPU资源的利用率。相比CPU而言,GPU作为一种较新的计算资源,硬件的更新换代较快,同时普通终端用户对于其计算特点也相对不了解。因此终端用户在指定GPU资源类型时,经常存在着不合理的情况。与此同时,GPU在线上又是相对稀缺资源。今天在线上,GPU申请量经常超过集群GPU总数,导致用户需要花很长时间排队等待资源。而另外一方面,集群中GPU的实际利用率却偏低,平均只有20%左右。这种申请和实际使用之间存在的Gap,往往是由于用户作业配置中,事先指定的GPU资源配置不合理造成。
在DAG2.0的框架上,PAI TF GPU作业(见session 2.2.2 的dynamic PS DAG)引入了一个额外的"计算控制节点",可以通过运行PAI平台的资源预测算法,来判断当前作业实际需要的GPU资源类型,并在必要的时候,通过向AM发送动态事件,来请求修改下游worker实际申请的GPU类型。这其中资源预测算法,可以是根据算法的类型,数据的特点,以及历史作业信息来做HBO (history based optimization),也可以通过dry-run的方法来进行试运行,以此确定合理的资源类型。
具体实现上,这个场景中control stage与Worker之间通过concurrent edge连接,这条边上的调度触发条件是在control stage已经做出资源选择决定之后,通过其发出的事件来触发。这样的作业运行期间的动态资源配置,在线上功能测试中,带来了40%以上的集群GPU利用率提升。
作为物理特性一个重要的维度,对计算节点的资源特性在运行时的动态调整能力,在PAI以及MaxCompute上都能找到广泛的应用。以MaxCompute SQL为例,对于下游节点的CPU/Memory的大小,可以根据上游数据的特点进行有效的预判;同时对于系统中发生的OOM,可以尝试自动调高OOM后重试的计算节点的内存申请,避免作业的失败,等等,。这些都是在DAG 2.0上新的架构上实现的一些新功能,在这里不做具体的展开。
5. 工程化与上线
作为分布式系统的底座,DAG本身的动态能力以及灵活度,在与上层计算引擎结合时,能够支持上层计算实现更加高效准确的执行计划,在特定场景上实现数倍的性能提升以及对资源利用率的提高。在上文中,也集中介绍了整个DAG 2.0项目工作中,开发实现的一些新功能与新的计算模式。除了对接计算引擎来实现更高效的执行计划,调度本身的敏捷性,是AM/DAG执行性能的基本素质。 DAG2.0的调度决策均基于事件驱动框架以及灵活的状态机设计来实现,在这里也交出DAG 2.0在基本工程素养和性能方面的成绩单:
这里选用了最简单的Map-Reduce(MR)作业为例,对于这种作业,调度执行上并无太多可以取巧的地方,考验的是调度系统本身的敏捷度和整个处理流程中的全面去阻塞能力。这个例子也凸显了DAG 2.0的调度性能优势,尤其作业规模越大,优势越发明显。此外,对于更接近线上的work-load的场景,在TPCDS标准benchmark中,当执行计划和运行逻辑完全相同时,2.0(未打开动态执行等功能)的高性能调度也给作业带来了显著的提升。
最后,对于一个从头到尾完整替代原有系统的新一代的全新框架,怎样无缝对接线上场景,实现大规模的上线,是一个同样重要(甚至更重要)的话题,也是对一个实际生产系统进行升级,与小范围的新系统POC之间最大的区别。今天的伏羲调度系统,每天支撑着阿里集团内外大数据计算平台千万的分布式作业。DAG/AM这一核心分布式调度执行组件的更新换代,要完整替换线上已经支撑了大数据业务10年的分布式生产系统,而不造成现有场景的失败,这需要的不仅仅是架构和设计上的先进性。如何在"飞行中换引擎", 保质保量的实现系统升级,其挑战完全不亚于新的系统架构本身的设计。要实现这样的升级,拥有一个稳固的工程基座,以及测试/发布框架,都是不可或缺的。没有这样子的底座,上层的动态功能与新计算模式,都无从谈起。
目前DAG 2.0 目前已全面覆盖了阿里集团MaxCompute所有线上的SQL离线作业和所有准实时作业,以及PAI 平台的所有Tensorflow作业(CPU和GPU)+ PyTorch作业。每天支撑数千万分布式作业的运行,并经受了19年双11/双12的考验。在面对两次大促创历史记录的数据洪峰(相比18年增长50%+)压力下,保障了集团重点基线在大促当天准时产出。与此同时,更多种类型的作业(例如跨集群复制作业等等)正在迁移到DAG 2.0的新架构,并且依托新的架构升级计算作业本身的能力。DAG 2.0的框架基座的上线,为各条计算线上依托其实现新功能打下了坚实基础。
6. 展望
伏羲DAG 2.0核心架构的升级,旨在夯实阿里计算平台长期发展的基础,并支持上层计算引擎与分布式调度方面结合,实现各种创新和创建新计算生态。架构的升级本身是向前迈出的重要一步,但也只是第一步。要支撑企业级的,各种规模,各种模式的全频谱计算平台,需要将新架构的能力和上层计算引擎,以及伏羲系统其他组件进行深度整合。依托阿里的应用场景,DAG 2.0除了在作业规模等方面继续在业界保持领先之外,架构和功能上也有许多创新, 比如前面我们已经介绍过的:
- 在业界首次在分布式执行框架上,实现了执行过程中逻辑图和物理图的双重动态可调;
- 通过Bubble机制实现了混合的计算模式,探索资源利用率和作业性能间的最佳平衡。
除此之外,2.0更加清晰的系统封层架构带来的一个重要改变就是能有利于新功能更快速开发,提速平台和引擎向前创新。由于篇幅有限,本文只能由点击面对介绍了一部分新功能与新计算模式,还有许许多多已经实现,或正在开发中的功能,在业界都是全新的探索,暂时不做进一步展开,比如
- 准实时作业体系架构的整体升级: 资源管理与多作业管理的解耦,支持准实时作业场景上的动态图功能
- 常驻的单container多slot执行的cache-aware查询加速服务(MaxCompute 短查询)
- 基于状态机的作业节点管理以及失败下的智能重跑机制
- 动态可定义的shuffle方式:通过recursive shuffle等方式动态解决线上大规模作业中的in-cast问题
- 基于adaptive的中间数据动态切分与聚合,解决实际分布式作业中各种数据倾斜问题
- 支持PAI TF GPU作业的多执行计划选项
- 通过DAG执行过程中与优化器的交互,实现渐进式的交互式动态优化
- 支持Imperative语言特性,通过DAG的动态自增长等能力,对接IF/ELSE/LOOP等语义
核心调度底座能力的提升,能够为上层的各种分布式计算引擎提供真正企业级的服务能力,提供必须的弹药。而这些计算调度能力提升带来的红利,最终会通过MaxCompute和PAI等引擎,透传到终端的阿里云计算服务的各个企业。在过去的十年,阿里业务由内向外的驱动,锻造了业界规模最大的云上分布式平台。而通过更好服务集团内部以及云上的企业用户,我们希望能够平台的企业级服务能力,可以完成由内向外,到由外至内的整个正向循环过程,推动计算系统螺旋式上升的不断创新,并通过性能/规模,以及智能化自适应能力两个维度方面的推进,降低分布式计算服务的使用门槛,真正实现大数据的普惠。
MaxCompute产品官网 https://www.aliyun.com/product/odps
更多阿里巴巴大数据计算技术交流,欢迎扫码加入“MaxCompute开发者社区”钉钉群。