作者:玄弟 七锋
PolarDB-X 面向 HTAP 的混合执行器 一文详细说明了PolarDB-X执行器设计的初衷,其初衷一直是致力于为PolarDB-X注入并行计算的能力,兼顾TP和AP场景,逐渐为其打造一款具备TB级数据处理能力的数据库。为了做到这一点,借鉴了各种数据库和大数据库产品,包括分析型数据库,实时数仓等,吸取了各方面的优势来打造出一个全新的并行执行引擎。这里将对整个分布式并行执行框架做详细的介绍,希望阅读之后对我们的执行器有一个全面的认识。
▶ 整体设计
PolarDB-X 是一个 Share Nothing 的数据库,采样了计算和存储分离的架构。其中数据以分片的形式存储于每个DN节点,计算节点叫CN。在计算过程中,DN和CN间、DN和DN、CN和CN间是通过千兆、万兆网络通信。每个CN节点上一般都会有调度组件、资源管理组件、RPC组件等。一个复杂的查询,会被调度到多个CN节点上执行,考虑到数据一般都会根据均匀策略分到各个DN上,所以每个CN节点同时会访问多个DN。
当用户提交一条复杂的SQL语句,往往需要访问多个DN节点,这个时候就会启动并行调度策略,整个执行步骤可以简单理解:
- 用户所连接的这个 CN 将承担查询协调者(Query Coordinator)的角色;
- Query先发送给Query Coordinator,会首先经过优化器生成最新的Plan,然后会拆分到多个子计划(Fragment), 每个Fragment可能会包含多个执行算子。如果有的Framgnt负责扫描DN的话,它里头必定包含Scan算子,从DN拉取数据;Fragment也可以包含Agg或者Join等其他算子;
- Query Coordinator里头的调度器(Task Scheduler)会按照定义的调度逻辑将各个Framgnts封装成Task,调度到合适的CN上执行,这里头可能会涉及到一些资源上的计算;
- 各个CN收到Task后,会申请执行资源,构造执行的上下文,开始启动Task,并定期会向Query Coordinator汇报状态;
- 各个Task间会通过数据传输通道(DTL)交换数据,当所有的Task都执行完毕后,会将数据结果返回给Query Coordinator,由它负责将结果返回给用户;
- 成功返回给用户后,Query Coordinator和被调度到的各个CN节点的Task会做清理动作,释放计算资源。
整个流程大致就这样,有细心的同学会发现我们的框架在DN上有一层叫Split概念。我们的数据是按分片存储在各个DN上的,Split指的是数据分片partition的地址。对于包含扫描算子Scan的 Task,会计算出需要访问哪些 partition,这些 partition 分布在哪些 DN 上,然后封装成splits按比例划分给这些扫描Task。但是实际运行过程中每个扫描Task并不是预分配好splits的,而是预分配部分splits给扫描Task,看哪一个Task扫描的更快就会从Query Coordinator继续获取余下splits,这样可以尽可能避免由于各个扫描Task资源不均衡导致的消费长尾现象。但是如果一个表只被分成了2个分片,是不是意味着扫描任务至多只能是2,这可能起不到明显的并行加速效果。所以我们也支持在分片上继续按照分段做拆分,那么这个时候的Split除了会记录分片的地址,也会记录在分片上分段的位移。按照分段做拆分后,即便数据的分片数量有限,执行过程我们依然可以启动更多的扫描Task,并行去加速扫描。
▶ 执行计划
执行引擎执行的是由优化器生成的分布式执行计划。执行计划由算子组成。因为PolarDB-X的数据按照分片存储到各个的DN节点上去,执行计划执行也会尽可能的满足数据分布的locality,能下推的计划会被放到DN上执行,不能下推的计划会会被切分成一个个子计划(Fragment),会被放到各个CN节点上执行。所以这里我们需要关心如何将一个从优化器出来的计划,拆分成分布式计划,放到各个CN上执行?
为了更好地理解这个过程,我们这里以一条简单SQL: select * from (select useid, count(*) as b from user_data group by userid) as T where T.b > 10
为例,经过优化器生成这样的相对最优计划:
针对并行执行计划,为了更高效地执行尽量减少数据传输,可以把执行计划按照计算过程是否需要数据重分布(ReDistribution)分为不同片段(fragment)分布到相应节点执行,并且把一些操作下推来减少扫描输出的数据,上面的计划可能就变成这样的执行计划,由多个子片段构成。
不同片段之间通过 NetWork Write/Read 算子进行数据交换。更复杂的比如多表关联(join)查询,会有更多的片段和更复杂的数据交换模式。每个片段的并发度可以不同, 并发度是基于代价推导出来的。多机调度的基本单位是Stage,Stage记录了上下游片段的位置信息,以便上下游之间建立网络通道(DTL)。每个片段调度到计算CN节点后,会被封装成逻辑执行Task,比如fragment-1并发度是2的话,那么会将Task-1.0和Task-1.1 两个Task分别调度到两个CN节点。
Task仍然是CN节点计算的逻辑单元,PolarDB-X执行器不仅仅可以支持单机并行能力(Parallel Query),也可以做多机并行(MPP)。所以在CN节点还引入了二层调度逻辑。当然二层调度的好处不仅仅于此,后面我们还会提到。这里会继续在Task内部根据算子间数据交换的特性,继续做切分,切分成不同Pipeline。
不同的Pipeline并发度也可以不同,每个Pipeline会根据处理的数据规模大小会计算出不同的并发度,生成具体的执行单元Driver,Driver间会根据二层调度确定上下游的本地通道(Local Channel)。
至此你应该可以了解从执行逻辑计划转化为分布式物理执行的整个过程。引入了一些新的名称,这里统一做下梳理:
-
Fragment:指的是逻辑执行计划按照计算过程中数据是否需要重分布,切割成的子计划。
-
Stage:是由Fragment封装而成的调度逻辑单位,Stage除了封装Fragment外,还会记录上下游Stage间的调度位置信息。
-
Task:Stage并不会在CN上直接运行,他们是通过并发度分解成一系列可调度到CN上的Task, Task依然是逻辑执行单元。
-
Pipeline:对CN上的Task根据二层并发度做进一步切分,切分成不同的Pipeline。
-
Driver:一个Pipeline包含多个Driver,Driver是具体的执行单元,是一系列可运行算子的集合。
一般来说针对一个复杂查询,一个query包含多个Fragment,每个Fragment和Stage一一对应,每个Stage包含多个Tasks,每个Task会切分成不同的Pipeline,一个Pipeline包含了多个Driver。只有理解上面说的Fragment/Stage/Task/Pipeline/Driver这些概念,你才能更清楚了解我们接下来的设计。
▶ 调度策略
并行计算在运行之初,需要解决任务调度问题。调度的直白理解,就是将切分好的Task调度到各个CN节点去执行,充分利用各个CN的计算资源。这里头大家很容易有这些疑问:
1. 执行过程中各个CN节点的计算资源是不均衡了,那么在多机调度中是如何将各个Task打散到不同CN节点去执行? 2. 和各个DN交互的Task是如何并行的拉数据的?比如某个逻辑表分成了16个物理表,分布在4个DN节点上,有4个Driver去并行拉数据,每个Driver并不是均匀拉取4个物理表,而是根据自身的消费能力来确定拉取的物理表数量;多个Driver下发扫描任务会不会同时恰好落地一个DN节点上,导致某个DN成为瓶颈? 3. 我们完全可以在一个CN节点,同时调度多个Task执行,已经可以做到单机并行,为什么还要二层调度?
一层调度(多节点间)
为了解决(1) 和 (2) 的问题,我们在CN节点内部引入了调度模块(Task Scheduler),主要负责Task在不同CN节点上的调度,这一层调度我们这里称之为一层调度,在这层调度中,同属于一个Stage的多个Task一定会被调度到不同CN节点上去,确保一个CN节点只能有相同tage的一个Task。调度过程中通过心跳不断维护Task状态机,也维护着集群各个CN节点Load信息,整个调度是基于CN Load做调度的。多机调度流程如下所示:
Resource Manager(RM)是CN节点上个一个资源管理模块,RM会借助Task心跳机制实时维护集群各个CN节点的负载,Task Scheduler组件会基于负载选择合适的CN节点下发执行任务,比如CN-1 负载相对集群其他CN节点来说高很多,那么当前查询的Task会分发给其他CN节点,避免调度到CN-1节点去。执行器在执行Task任务时,Task并不是创建好的时候就确定了其消费DN splits的映射关系。各个Task按批次动态拉取splits进行消费, 直白理解就是谁的消费能力越强谁就有可能消费更多的splits。同样为了解决同一个时刻多个任务同时消费同一个DN上的splits问题,我们在调度之初会将splits根据地址信息按照Zig-zag方式,把各个DN上的splits打散到整个splits queue上去,消费的时候可以尽可能分摊各个DN压力,这样计算过程中也会充分利用各个DN的资源。
有了一层调度后,我们也可以将同属于一个Stage的多个Task调度到同一个CN,这样其实也可以做到单机并行。如果这样设计的话,我们容易忽略两个问题:
- 一层调度的逻辑比较复杂,需要多次交互,一个CN内部需要同时维护各个Task的状态,代价会比较大,这在TP场景是无法容忍的;
- 一层调度中,并发度越高,生成Task就越多,这些Task间需要建立更多的网络传输通道。
二层调度(节点内部)
为了解决上述一层调度的不足,为此我们在参考Hyper的论文[1],引入了二层调度,既在CN节点内部单独做单机并行调度,简单来说我们会在Task内部借助CN的本地调度组件(Local Scheduler),对Task做进一步的并行调度,让Task在CN上执行,也可以做到并行运行。下图中,Stage-1和Stage-2是上下游关系,各自并发度都是9,调度到3个CN节点执行。如果只有一层并发度的话,每个CN节点还会调度运行3个Task,那么上下游之间总共会建立81个Channel,CN节点内部Task是相互独立的,这样缺点还是很明显:
- 多个Channel,放大了网络开销,同一份buffer会被发送多次,发送和接收对CPU和Memory都有代价;
- 数据发送的对象是Task,数据本身有倾斜,会导致同节点内Task之间的负载不均衡(hash skew),存在长尾问题。
而一层调度和二层调度相结合的话,Stage-1和Stage-2的一层并发度是3,这样每个CN节点只会有1个Task,Task内部并发度3。由于shuffle的对象是Task,所以Stage-1和Stage-2间只会建立9个Channel,大大减少了网络开销,同时Task内部的3个Driver内数据是共享的,Task内部的所有的Driver可以共同消费接受到的数据,并行执行,避免长尾问题。针对于HashJoin,假设Ta为大表,Tb为小表,这两个表做HashJoin,可以让Ta和Tb同时shuffle到同一个节点做计算;也可以让小表Tb广播到Ta所在节点做计算,前者的网络代价是Ta+Tb,而后者的代价是N*Tb(N代表广播的份数)。所以如果只有一层调度的话,N可能比较大,执行过程中我们可能会选择两端做shuffle的执行计划;而一层和二层相结合的调度策略,可以让执行过程中选择BroadcastHashJoin,这样可以避免大表做shuffle,提高执行效率。
此外在二层调度策略中,task内部的多线程很容易做到数据共享,有利于更高效的算法。如下图,同样是HashJoin过程中,build端的Task内部多个线程(driver)协同计算:build端收到shuffle的数据后,多线程协同建立一个共享的hash表。这样一个task只有一个build table,probe端收到shuffle数据后,也不用做ReDistribution了,直接读取接受到数据,进行并行的probe。
▶ 并行执行
聊完调度,接下来应该是关心任务是如何在CN上运行,运行过程中遇到异常我们系统是如何处理的呢?
线程模型
说到执行,有经验的同学可能会发现我们的调度并没有解决调度死锁问题,比如对于下面这样一个执行计划,两表Join。一般会遇到两种问题:
1. 如果先调度f3和f2的话,这个时候假设集群没有调度资源,则f1不能迟迟调度起来。而HashJoin的逻辑就是需要先构建buildTable,这里f1刚好是build table部分。最终会导致执行死锁:f1在等待f3和f2的计算资源释放,而f2和f3又在等待f1构建完buildTable;
2. 如果f1先调度起来了,假设这个时候f2和f3没有调度资源,这个时候f1从DN拉出来的数据,其实是无法发送给f3的,因为f3还没有被调度起来。
解决问题1,业界有很多方式,比较常见是在调度之初构建调度依赖关系(Scheduler Depedency):f1->f3-f2。而解决问题2,往往是将f1把DN拉出来的数据先放到内存中,实在放不下就落盘处理。可见处理上述两个问题,执行框架不仅仅需要在多机调度上做复杂的调度依赖关系,同时还需要考虑对落盘的支持。而其实我们在调度的时候,并没有去考虑调度依赖这个事情,我们是一次性把f1/f2/f3全部调度起来了,这个是为何呢?这就要说下我们执行中的逻辑线程模型概念。在大多数计算引擎中,一个查询首先会通过资源调度节点,在各个CN上申请执行线程和内存,申请成功后,这些执行资源会被调度组件占用,用来分配当前查询的Task,不可以再被其他查询所利用,这种是真实的执行资源,和调度资源相互绑定,当CN上可利用的执行资源不够的时候,才会出现调度死锁问题。而在PolarDB-X中,我们并没有在调度的时候申请真实的线程资源,调度只需要考虑各个CN的负载,不需要考虑各个CN到底还剩多少可利用的真实资源。我们的线程模型也并没有和调度资源绑死,每个Driver其实不独占一个真实的线程,换句话说,真实的线程也并没有和调度资源一一对应。虽然说Driver是执行的基本单元,但是在调度上来看,它又是逻辑的线程模型而已。那是不是意味着只要有调度任务,都可以被成功调度到CN上去,答案是肯定的。一次性调度所有的执行单元到CN上去执行,对内存和CPU也是一种开销。比如f2被执行起来后,但是f1并没有执行完毕,那么f2也会不断执行,其数据其实也会被缓存起来,但是也不能无限缓存数据呀?为了解决这个问题,接下来就需要借助我们的时间片执行了。
时间片执行
我们在每个CN节点内部会有一组执行线程池来运行这些Driver,每个Driver会排队进入线程池参与计算,如果Driver被阻塞就会退出到Blocking队列中,等待被唤醒。比如f2 driver 启动后,从DN拉了数据放到有限空间buffer里头去,这个时候假设f1 driver都没有结束,那么f2 driver 对应的buffer就会满,满了后就会阻塞住,一旦阻塞我们的执行框架就会让f2 driver从执行器退出来,加入到Blocking队列中,简单的说就是将计算资源腾让出来,等待被唤醒。直到f1 driver都执行完毕后, f2 driver会被唤醒,执行框架就会将他移动到Pending队列中,等待被调度到执行线程池中继续运行。这里头还是会浪费点内存,但相对于CPU资源来说,内存资源还是比较充裕的。
时间片执行的核心就是需要判断Driver何时会被Block的,总结起来被阻塞的原因一般分为三种情况:
- 根据算子依赖模型来确定,比如图中f1 driver未执行完毕,那么f2 driver其实也会被阻塞(这个是一个可配置的选项);
- 计算资源不足(主要指内存),对应的driver会被挂起,等待资源释放;
- 等待DN响应,物理SQL下发给DN后,Driver会被挂起,等待物理SQL执行完毕。
除此之外我们在借鉴Linux 时间片调度机制,会在软件层面上统计Driver的运行时长,超过阈值(500ms),也会被强制退出执行线程,加入到Pending队列,等待下一轮的执行调度。这种软件层面上的时间片调度模型,可以解决复杂查询长时间占用计算资源问题。其实实现起来也挺简单的,就是每计算完一个批数据后,我们会对driver的运行时长进行统计,超过阈值,就退出线程池。下面贴出了Driver处理逻辑的部分伪代码,Driver在执行采用的是经典的Producer-Consumer模型,每消费一个Chunk我们就会累计时间,当超过既定阈值,就会退出来。
任务状态机
高并发系统,频繁地等待或者任务切换是常见的系统瓶颈。异步处理是一种已经被证明行之有效地避免这些瓶颈,把高并发系统性能推到极致的方法。所以PolarDB-X执行器的整个后端,统一使用全异步的执行框架;同时MPP执行过程涉及到多机的协调,所以这就要求我们在系统内部维护这些异步状态。异步状态的维护特别重要,比如某个查询下的Task执行失败,需要立即通知到整个集群中该查询正在运行的Task任务,以便立即中止,以防出现Suspend Task,造成资源不释放问题。
所以在执行器内部,我们从三个维度(Task Stage Query)去维护状态, 这三种State是相互依赖耦合的,比如Query 被Cancel,会立即通知其所有的Stage,各个Stage监听到状态变化,会及时通知给其所有的Task,只有等待Task都被Cancel后,Stage 最后的状态才变更为Cancel,最终Query的状态才被标记为Cancel。在这个过程中我们会引入对状态机异步监听机制,一旦状态发送变更就会异步回调相关处理逻辑。通过维护这些状态,我们也可以及时通过查询或者监控诊断到任务是否异常,异常发生在哪个环节,也便于我们后期排查问题。
▶ 资源隔离
如果并发请求过多的时候,资源紧张会让请求线程参与排队。但是正在运行的线程,需要耗费比较多的计算资源(CPU和Memory)的时候,会严重影响到其他正常正在运行的Driver。这对我们这种面向HTAP场景的执行器是决定不被允许的。所以在资源隔离这一块,我们会针对不同WorkLoad做计算资源隔离,但这种隔离是抢占式的。
CPU
在CPU层面上我们是基于CGroup做资源隔离的,根据WorkLoad不同我们把CPU资源分为AP Group和TP Group两组,其中对TP Group的CPU资源不限制;而对AP Group是基于CGroup做硬隔离,其CPU使用水位的最小阈值(cpu.min.cfs_quota)和最大阈值(cpu.max.cfs_quota)来做限制。执行线程分为三组: TP Core Pool 、AP Core Pool、SlowQuery AP Core Pool,其中后两者会被划分到AP Croup一组,做严格的CPU限制。Driver会根据WorkLoad划分到不同的Pool执行。看似很美的实现,这里头依然存在两个问题:
1. 基于COST识别的WorkLoad不准怎么办?
2. AP查询比较耗资源,在同一个Group下的多个慢查询相互影响怎么办?
出现问题(1)主要的场景是我们把AP类型的查询识别成了TP,结果会导致AP影响到TP,这是不可以接受的。所以我们在执行过程中会监视TP Driver的执行时长,超过一定阈值后仍没有结束的查询,会主动退出时间片,然后将其它调度到AP Core Pool执行。而为了解决问题(2),我们会将AP Core Pool中长时间运行都未结束的Driver,进一步做优雅降级,调度到SlowQuery AP Core Pool执行。其中SlowQuery AP Core Pool会设置执行权重,尽可能降低其执行Driver的频率。
MEMORY
在内存层面上,会将CN节点堆内内存区域大致可以分为四大块:
- TP Memory:用于存放TP计算过程中的临时数据
- AP Memory:用于存放AP计算过程中的临时数据
- Other:存放数据结构、临时对象和元数据等
- System Reserverd:系统保留内存
TP和AP Memory分别会有最大阈值和最小阈值限制,两者内存使用过程中可以相互抢占,但是基本原则是:TP Memory可以抢占AP Memory,直到查询结束才释放;而AP Memory可以抢占内存TP,但是一旦TP需要内存的时候,AP Memory需要立即释放内存,释放方式可以是自杀或者落盘。
▶ 数据传输层(DTL)
并行计算是充分利用各个CN资源参与计算,那么DN与DN之间必然会存在数据交互。各个DN上的上下游的Task数据需要传输,比如上游的Task数量N,下游的Task数量是M,那么他们之间的数据传输通道需要用到M*N个通道(Channel),同样的我们将这些通道(Channel)的概念抽象成数据传输层。这个传输层的设计往往也会面临两个问题:
1. 通道分为发送端和接受端,当发送端源源不断发送数据,而接受端无法处理的话就会造成内存雪崩;
2. 数据在传输过程中丢失。
在业界实现数据传输主要有两种传输方式:Push和Pull。Push就是发送端往接受端推送数据,这里头为了避免接收端处理不过来,需要引入流控逻辑,一般的做法都是在接收端预留了槽位,当槽位被数据占满时会通知发送端暂停发送数据,当有接收端数据被消费空闲槽位出现时通知发送端继续发送,这里头会涉及到发送端和接收端的多次交互,流控机制相对比较复杂。Pull就是发送端将数据先发送到buffer里头去,接收端按需从发送端的的buffer拉数据,而当发送端发送的数据到buffer,接收端假设长时间不来拉数据,最后发送端buffer满了,也会触发上游反压,为了避免频繁反压,往往发送端的buffer不应该设置太小。综合起来我们选择了pull方式来做。采样pull方式也会遇到两个问题:
1. 每个receiver一般会和上游多个sender建立连接,那么每次都是通过广播的方式从上游所有的sender拉数据吗?
2. 一次从sender端到底请求多少的数据呢,即averageBytesPerRequest?
我们先回答问题(2),我们这里会记录上一次请求的数据量lastAverageBytesPerRequest、当前建连通道个数n以及上一次总共返回的数据量responseBytes,来计算出当前averageBytesPerRequest,具体的公式下面也给出了。至于问题(1),有了当前的averageBytesPerRequest后,结合目前receiver上buffer剩余空间,可以估算出这一次需要向上游几个sender发送请求。
在异步通信过程中为了保证传输可靠性,我们采用了类似tcp ack的方式,当receiver端带着token去上游拉数据的时候,则表示当前token之前的数据均已经被receiver端消费完毕,sender可以释放这些数据,然后将下一批数据以及nextToken返回给receiver端。
▶ 效果展示
前后说了很多干货,下面咱们来点简单实际的东西。这里以TPCH Q13为例来演示下执行器在不同场景下的加速效果,为了方便截图在Q13后面都加了limit。该测试环环境下,CN和DN规格都是2*16C64G。
单机单线程下运行,耗时3min31s
使用Parallel Query加速,既单机多线程执行,耗时23.3s
使用MPP加速,既同时利用两个CN节点的资源计算,耗时11.4s
▶ 总结
不管是简单查询,还是 Parallel Query和MPP场景下的复杂查询,共用的都是一套执行框架。不同场景下对执行器的要求,更多的是并发度设置和调度策略的差异。相对于业界其他产品来说,PolarDB-X执行器主要特点:
- 在资源模式上使用的是轻量化的资源管理,不像大数据计算引擎,需要额外引入的资源管理的节点,做严格的资源预分配,主要考虑到我们的场景是针对于小集群的在线计算;
- 在调度模型上执行器支持DAG调度,相对于MPP调度可以做到更加灵活的并发控制模型,各个Stage间、Pipeline间的并发可以不一样;
- 区别与其他产品,AP加速引用的是外挂并行计算引擎,PolarDB-X并行执行器是内置的,不同查询间共用一套执行模型,确保TP和AP享有一致的SQL兼容性。
PolarDB-X并行计算在线上已经平稳运行了近两年,这两年来我们不仅仅在执行框架上做了很多稳定性工作,在算子层的优化我们也沉淀了不少的技术。但这些还不够,目前比较热的是自适应执行,结合Pipeline模式的自适应执行挑战比较大,我们近期也在研究,欢迎感兴趣的朋友来拍拍砖,一起进步!
▶ Reference
[1] V. Leis, et al., Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age, in SIGMOD, 2014.
[2] Presto: SQL on Everything.
[3] A Deep Dive into Query Execution Engine of Spark SQL.
[4] Impala: A Modern, Open-Source SQL Engine for Hadoop
[5] FusionInsight LibrA: Huawei's Enterprise Cloud Data Analytics Platform. Proc. VLDB Endow. 11(12): 1822-1834 (2018)
【相关阅读】
HTAP 数据库“必修课”:PolarDB-X Online Schema Change
每次都需要解释大量指令?使用 PolarDB-X 向量化引擎
如宝马3系和5系:PolarDB-X 与 DRDS 并驾齐驱