一条SQL在 MaxCompute 分布式系统中的旅程

摘要:2019杭州云栖大会大数据技术专场,由阿里云资深技术专家侯震宇、阿里云高级技术专家陈颖达以及阿里云资深技术专家戴谢宁共同以“SQL在 MaxCompute 分布式系统中的旅程 ”为题进行了演讲。本文首先介绍了 MaxCompute 计算平台及其特点、超大规模企业级SQL引擎和其功能。然后讲解了如何构建企业级分布式智能调度执行框架。最后介绍了新一代列式存储引擎AliOrc及优化方式。

视频直播回放 >>>
以下为精彩视频内容整理:


MaxCompute–面向企业的超大规模计算

  • 全托管,多租户,超大规模平台

    MaxCompute拥有庞大的用户群体,支持阿里集团的各个关键业务和复杂场景,支持多个互联网新兴企业核心业务,以及支持关系国计民生、国家安全的关键行业。同时拥有超大规模计算存储,包括单日千万以上计算任务、多EB级别存储量、10万台以上服务器以及全球超过十个数据中心。
  • 企业级高性能计算引擎

    TPC-BigBench是更面向于大数据运算的BigBench,覆盖一些复杂类型,包括机器学习场景,更贴近于大数据场景的业务。在2017年,阿里的TPC-BigBench首个通过100TB 验证的引擎。在2018年,TPC-BigBench 首个达到18000+引擎。在2019年,进一步提升到 25000+,正式公布到TPC官网。

MaxCompute不仅仅在阿里集团内部被广泛的使用,也支撑着许多著名的互联网方面的厂商,以及关系到国计民生、国家安全方面的应用。

一条SQL在 MaxCompute 分布式系统中的旅程

超大规模企业级SQL引擎–MaxCompute UniSQL

一条SQL在分布式系统中的旅程

一条SQL在 MaxCompute 分布式系统中的旅程

上图所示为运行SQL任务中的大概流程。首先使用一条SQL语句,通过Compile,可以生成一个逻辑的执行计划,这个逻辑的执行计划是计算机能够理解的,再经过Optimize过程,无论逻辑计划有多复杂,都要翻译成针对目前集群和运行时刻的Runtime最优的物理执行计划,对于每一个Optimize不一定与原始的SQL相关。然后经过计算调度框架,使得合理快速的安排执行任务。由调度框架做的安排应用到每台机器之后,每台机器都会有一个SQL的运行时(Runtime Engine),它是真正能够理解物理执行计划的,并且一步一步把数据从Storage中读出来,再经过Shuffle得到结果,最后返回到Storage中。可以看出,运行时本身的性能是非常关键的,仅仅一条SQL语句有可能消耗几百T的data,这时,Storage的性能也是至关重要的。

SQL的功能

  • Not Only SQL – 脚本模式

    一条SQL在 MaxCompute 分布式系统中的旅程

上图为SQL的一个脚本,上面是配置语句,下面是创建的表,每句都是SQL的语句,但是这些SQL语句都可以串在一个脚本里,当表述一个非常复杂的逻辑时,不需要把脚本写成嵌套的形式,这种方式更灵活,能够支撑更复杂的业务场景。阿里内部有非常复杂的业务场景,过去不支持这种方式时,用户是使用嵌套的方式,使脚本变得复杂和扭曲,并且有大量的重复,以致不能解决时,就会将其拆分,再通过外部调动的方式串连起来。因为人如果承受不了维护的代价,就要引用额外性能的开销,后面的语句就要引用前面的语句。不管脚本有多复杂,经过编译器之后,还是一个单一完整的执行计划,并不会带来额外的开销。优化器看到的上下文越多,优化的机会越多,形成单一完整的执行计划之后,就可以以最高效的方式执行整个业务模式。此外,DataWorks也是支持这种模式的。通过脚本模式可以效仿C++或者Java来写SQL。

  • Not Only SQL– 参数化视图

    一条SQL在 MaxCompute 分布式系统中的旅程

例如,在写C++或者Java时,经常在公共的逻辑中抽取出来一个函数,把公共的逻辑放到某一个模块里,这个过程视为代码的重用机制。但是,标准的SQL,尤其是大数据的SQL是缺乏这种机制的。对于阿里这么复杂的场景,这种需求是很迫切的。底层的数据集提供了各个部门都需要的基础的数据,不同的业务部门可能都要消耗这部分数据,消耗的方式是不同的。这时,也想像C++或者Java那样抽取一个函数,在MaxCompute中是可以实现的。在MaxCompute里,上图中的红框除了作为普通的view,也可以封裝一些SQL复杂的逻辑和对数据的读取,可以把表的变量传入,这时就可以实现像C++或者Java中函数的功能,可以把SQL里公共的业务逻辑封装在一起,同时结合上文的脚本模式,参数化视图就可以组织非常复杂的SQL的业务逻辑用来支撑非常复杂的业务场景。

  • Not Only SQL – IF/ELSE

    一条SQL在 MaxCompute 分布式系统中的旅程

一般的大数据不支持IF/ELSE,但对于IF/ELSE是有需求的。例如,每周做一次全量的计算,但每天只做增量的计算,如果没有IF/ELSE的支持,就需要把脚本拆成两个,通过调度的框架串连起来。但是,在MaxCompute中结合脚本模式,可以直接的写入IF语句或SELECT语句,如果返回的是异常的结果,直接可以放在一个表达式里,决定执行SQL的分支。所有的SQL的功能都是针对复杂的应用场景的需要。

  • Not Only SQL – UDT & TRANSFROM

    一条SQL在 MaxCompute 分布式系统中的旅程

普通的SQL都会有基本的数据类型,有时也有复杂类型,但都是属于给定范围的数据类型。当数据类型特别复杂时,在MaxCompute里可以直接使用。右侧框架是将Java和SQL无缝的融合在一起,无需UDF封装。左侧为SELECT TRANSFORM,是直接就可以在SQL里调用shell脚本,并且完全兼容Hive。

SQL的性能

  • SQL Engine for Huge Data - Adaptive Join

    一条SQL在 MaxCompute 分布式系统中的旅程

Adaptive Join包括Hash Join和Merge Join。Hash Join的性能是比较好的,但有时碰到不合适的场景时,特别是有非常多的Hash冲突时,性能就会变得很差。Merge Join的特点是能够提供一个性能的下限。可以通过动态的选择适合哪种场景,以便做智能的选取。

  • SQL Engine for Huge Data – Advanced Shuffle

    一条SQL在 MaxCompute 分布式系统中的旅程

Shuffle也有针对特定大规模系统的优化,包括提升Shuffle 70%的性能,提升大规模共享集群性能,提升稳定性,降低IO压力。具体包括以下优化方式:

1、Greysort模式(Mapper不排序,Reducer排序),增加与下游流水线机会;下游转化为HashJoin时消除排序
2、Encoding & Adaptive列式压缩,降低IO与Cache Miss
3、优化内存结构,降低Working Set Size并消除Pointer Chasing

企业级分布式智能调度执行框架

打造企业级分布式调度执行系统

一条SQL在 MaxCompute 分布式系统中的旅程

整个系统的发展有两个维度,一个维度是系统的规模,随着系统规模的不断成长,对于分布式调度执行系统要面对每天千万级需要解决的问题,在阿里这个大体量的数据下,单个分布式作业规模已经能达到数十万个计算节点,已经有上百亿连接和运行数万台的物理机。

另一个维度是系统的成熟度,一个系统成为企业级的分布式执行调度系统就需要达到成熟度,包括三个阶段,第一个阶段是可用性(正确性),一个作业在单机系统上执行的结果和分布式系统上执行的结果是不一样的,尤其是在系统的超大规模上,在面对系统各种各样的节点失败问题、网络层的失败问题和各种容灾问题时,怎样通过正确的方式能保证作业正确的产出是很重要的。第二阶段是够用,是指每一个计算的系统都要锻造自己的性能,能在各种各样的benchmark上标准结果,通过此方法来提升性能。第三个阶段是好用(智能化),是指在动态执行过程中拥有动态能力和自适应能力,可以根据作业的不同特点来调整作业执行的计划。

企业级分布式计算调度框架

企业级分布式计算调度框架分为三个阶段:

  • 动态的智能执行

    一条SQL在 MaxCompute 分布式系统中的旅程

上图所示为阿里的一个作业在离开优化器以后,在分布式系统里执行的过程。可以理解为从逻辑图到物理图映射的过程。

一条SQL在 MaxCompute 分布式系统中的旅程

上图所示为三个阶段的作业,第一个阶段是作业提交开始运行,第二个阶段是根据实际产出动态调整并发,第三个阶段是产生所需数据提前结束作业。

一条SQL在 MaxCompute 分布式系统中的旅程

上图所示为智能化DAG执行的动态逻辑图,包括Sorted Merge Join和Broadcast Join两种算法。其中Sorted Merge Join的特点包括经典分布式join算法,可支持大规模作业,可用范围广(slow but reliable),代价较昂贵 (full shuffle + sort),且shuffle可能带来数据倾斜。Broadcast Join的特点是只适用特定类型作业 (一路输入可载入单计算节点内存),非适用场景上可能导致OOM,作业失败。

一条SQL在 MaxCompute 分布式系统中的旅程

对动态的选择执行计划,在理想情况下都希望数据的分布是均匀的,并且可以理解数据的特性,所以优化器都可以做出“最佳”的计划,尤其是在做benchmark时,但是由于源数据统计不准确 、中间数据特性波动 ,所产生数据的特点是没有办法提前预估的,所以允许优化器来给一个非确定的执行计划(Conditional Join),这时,优化器会给出两个执行路径的计划,调度执行框架可以根据上游实际产生的数据量,动态的调整逻辑图的执行。

一条SQL在 MaxCompute 分布式系统中的旅程

上图所示为并发度的例子。简单的并发调整是根据上游总数据量直接取平均作为并发,仅支持向下调整,但问题是数据可能是倾斜的,这种方法已不再适用。下面给出两种新的调度方法:

1、依据分区数据统计调整:避免并发调整加重数据倾斜,可向上向下调整。
2、分区统计基础上,自动切分大分区:双重调整,消除分区内的数据倾斜,并支持数据处理归并,以保留分区特性。

  • 高效作业管理

    一条SQL在 MaxCompute 分布式系统中的旅程

对于阿里如此大规模的作业,调度的敏捷度是十分重要的,因为集群规模很大,一个作业怎样理解各个计算节点和物理机的状态,做智能的容错和预判性的容错是阿里所做的一项工作。随着作业规模越来越大,一个非常优秀的调度框架能带来的性能提升会越来越明显。

  • 多种计算模型融合

    一条SQL在 MaxCompute 分布式系统中的旅程

阿里整个计算平台作为飞天的底座,不仅仅运行SQL,也有可能运行其他。最经典的SQL是batch执行。离线和一体式的执行是资源利用率和性能优化的两个极端,作为一个用户,会同时关注执行性能和资源利用率,需要思考的问题是,怎样在两个点中达到平衡。因此,阿里也支持一种称为bubble的调度,所谓bubble调度是允许一个作业的子图同时调度,下游的子图分布调度,在不同的SQL上会有不同的效果。例如。在TPCH11的情况下,相对于离线(batch)会有66%的性能提升,相对于一体式(all-in-one)会节省3倍的资源,同时获取95%的性能。

新一代列式存储引擎AliOrc

在AliOrc的里程中,起点和终点都是在存储层,数据的读和写是AliOrc执行的开始和结束,存储引擎作为AliOrc的底座,承担着一个非常重要的作用。

基于Apache Orc的深度优化

一条SQL在 MaxCompute 分布式系统中的旅程

整个计算引擎是基于列结构的,技术的出发点是Apache Orc。在此基础上,阿里做了很多深度的优化,包括I/O维度、内存优化、索引和数据编码压缩。其中有一部分已经贡献到了社区。

新一代列式存储引擎

新一代列式存储引擎包括以下技术方面:

  • 并行化编码技术

    一条SQL在 MaxCompute 分布式系统中的旅程

对于有一系列的大数和小数,直接存放时会产生4个字节,而对于小数,前面会产生很多的零,这些零是没有意义的。并行化编码技术的主要思想就是将冗余的信息删掉,将真正有意义的batch留下,并且pack到一起。这种编码方式的好处是能实现并行化。此外,还进行了一些扩展,包括对有序数据的优化,以及对数据的编码优化。同时重新设计了编码存储格式,更利于内存对齐,以及列存储。

一条SQL在 MaxCompute 分布式系统中的旅程

从测试的结果来看,此编码技术比传统游程编码速度快4到6倍,压缩率提升大概10%左右,在反应到TPC Benchmark表扫描效率提升24%。之所以有如此快的结果,是因为使用AVX256一条指令可以处理8个64位数,或者16个32位数,同时充分利用函数模板展开,最大程度避免循环和分支预测失败。

  • 异步并行IO

    一条SQL在 MaxCompute 分布式系统中的旅程

阿里是属于列存储引擎的,是指在同一个列是放在一起的,好处是在读数据时选择几个列放到存储引擎中去读,就不需要读所有的列。假设在上图中的场景中,有三个列为A、B、C。最早的IO模型是串行的,存在许多等待时间。因此,阿里做了一个改进为Prefetch模型,IO是不需要一个一个发出去的,在一开始时可以将三个读取引擎一起发出去,但是需要一个一个的等待它们回来,虽然有了一些提升,但是还仍然存在IO等待的时间。目前为止,改进的模型为Prefetch+Async Paraller IO,是将IO全部并行化,将三个一起发出去之后,并不需要按照原来A、B、C的顺序等待,可以按照回来的顺序做解压和解码。这样做可以对IO等待的时间降到最小。

一条SQL在 MaxCompute 分布式系统中的旅程

如上图所示,异步并行IO与同步读取相比较,IO等待时间减少97%,端到端时间减少45%。

  • 延迟读取、延迟解码

    为了进一步的提高性能,减少数据读取量,从而减少数据解码、解压缩成为了关键。

一条SQL在 MaxCompute 分布式系统中的旅程

如上图所示为延迟读取的一个例子,通过只读取DEPT列,把ADDRESS以及SALARY列延迟到过滤之后读取,可以大幅减少了不必要的数据读取。

一条SQL在 MaxCompute 分布式系统中的旅程

对于字符串类型的列,有一种方法叫字典编码,是指将字符串里不一样的Key找出并且给予ID,这时,数据在存放时是不需要存放整个字符串的,只需要存放ID就可以。但是使用此方法是很耗时的。由此,做了以下改进:

一条SQL在 MaxCompute 分布式系统中的旅程

使用延迟解码,跳过解码步骤,直接在字典上匹配,再以ID到数据列搜索。好处是减少了字符串匹配次数以及减少了字典解码时间。

一条SQL在 MaxCompute 分布式系统中的旅程

如上图所示,对打开延迟读写和没有打开延迟读写做了比较,横坐标为filter过滤的数据,“1”表示没有过滤,纵轴是花费的时间,实现延迟读取之后,读取数据量随Selectivity的提升而减少,读取时间也相应大幅降低。


了解更多 MaxCompute 技术与产品详情,欢迎加入“MaxCompute开发者社区”,扫码或点击链接均可加入 https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
一条SQL在 MaxCompute 分布式系统中的旅程

上一篇:Python爬虫抓取图片,网址从文件中读取


下一篇:《Learning Scrapy》(中文版)第6章 Scrapinghub部署