异构集群,统一计算 在微博机器学习平台的应用

内容简要:

一、微博机器学习平台简介

二、异构集群,多计算引擎–Before

三、异构集群,统一计算–Now

四、解决方案

五、机器学习流程自动化

 

 

一、微博机器学习平台简介

微博机器学习平台为业务方在各种推荐场景下,提供端到端、一站式、自动化机器学习流程的解决方案,整体流程图如下:

异构集群,统一计算 在微博机器学习平台的应用

二、异构集群,多计算引擎–Before

异构集群,统一计算 在微博机器学习平台的应用

由上图可见,异构集群和多计算引擎的问题与痛点非常明显,接下来分别做出分析。

 

(一)异构集群

异构集群三大特性:入口分散、存储资源割裂、计算资源割裂

 

1.入口分散

当算法工程师想做一个数据处理或者机器学习时,他首先要知道要在哪个集群上,然后找到相应的Gateway,接着去申请登录权限,最后登录上去进行相应的操作。这样就会非常的分散,造成用户体验感混乱。

 

2.存储资源割裂

Hadoop集群、云上Hadoop集群、EMR还有MaxCompute的集群,都有自己的存储和云数据,这就造成了一个个的数据孤岛,数据无法共享。当一个集群生成的数据要在其它的集群上使用时,需要做一个数据搬运的工作,非常浪费时间,相应地对机器学习流程的自动化也会造成割裂。

 

3.计算资源割裂

各个集群的计算资源无法互相弹性使用,这样就会造成部分集群异常繁忙,部分集群较为空闲,使得计算资源无法得到最优化的利用。

 

(二)多计算引擎

多计算引擎存在三大痛点:学习成本高、开发效率低和维护难度大。

 

 

三、异构集群,统一计算–Now

针对以上的问题与痛点,我们进行了一系列建设和改造,完成了四个统一:

-   统一入口;

-   统一调度;

-   统一SQL计算引擎;

-   统一存储/元数据。

异构集群,统一计算 在微博机器学习平台的应用

如上图所示,当一个SQL任务经过调度中心,会按需灵活调度到各个异构集群上,各个异构集群都是统一的SQL计算引擎,这样就大幅降低了算法工程师的学习成本,提升开发效率,提升代码的共享。

统一的方法是让各个异构集群都支持SQL计算引擎,然后建设了一个统一的元数据中心,做湖/仓一体化,让各个异构集群之间使用同一份元数据,实现了数据共享。

 

 

四、解决方案

由于每个异构集群的特点不同,所以对每个集群也有不同的解决方案。因此针对各个的异构集权采取分而治之的方式,根据问题自身的特点提出了相应的解决方案,方案如下:

统一入口、统一调度;

统一SQL计算引擎、统一存储/元数据:

1)MaxCompute – 湖仓一体;

2)自建Hadoop – 统一元数据;

3)Kubernetes – 批流混跑;

4)EMR – SQL计算引擎优化。

 

(一)统一入口、统一调度

异构集群,统一计算 在微博机器学习平台的应用

如上图所示,统一入口主要是支持WebUI和CLI两种方式,调度中心主要有两个特点。

第一个特点就是每接入一个新的异构集群,支持Weibox即插即用,配置方式十分灵活。

第二点是调度中心支持按需做作业、排布的事情,然后根据异构集群之间计算引擎资源的使用情况来决策,将SQL计算作业提交到相应集群执行,相关的策略除了计算资源方面的还有集群之间的网络带宽。

如何实现异构集群接入实验室,精髓就在中间一排weibox。每个集群在接入调度系统的时候都有一套组件,称之为weibox。

每个weibox上都部署了一些异构集群,它会把前端用户发送过来的作业根据调度中心调到相应的集群上,不仅把作业提交上去,而且能将返回作业的状态信息以及所产生的日志数据让用户看到。

经过统一入口和统一调度,让各个异构集群实现了统一的管理和调度

 

(二)统一SQL计算引擎、统一存储/元数据

1. Maxcompute - 湖仓一体

异构集群,统一计算 在微博机器学习平台的应用湖/仓一体主要是让数据库和数据仓一体化、统一化。

平台有一个特点是所有的数据都是基于EMR存储展开的,有各种各样的结构化、非结构化的数据,做一些数据分析以及AI计算。EMR存储相当于一个数据湖,其他的各种服务和应用都围绕着数据湖去展开。

基于MaxCompute的集群上,有一些较高性能的深度学习的组件和机器学习的组件,比如TF、gnn等。

如果使用数据湖中丰富的数据资源?MaxCompute的集群和EMR集群是两个完全异构的集群,它们之间的存储资源与计算资源完全割裂,这种数据同步方式非常低效,需要人工的介入,使用起来较为复杂,使得算法工程师做数据同步时效率低下。

基于痛点,提出了一个湖/仓一体的概念,让它的数据湖和数据仓库统一化、一体化。经过一系列的措施,如通过语言数据透视的功能、开放数据库API的能力,在MaxCompute的集群上的各种深度学习的组件,开放数据湖API的能力,使得能直接访问和读写数据湖当中的数据,建设了一个统一的元数据中心,实现了数据共享。

 

数据共享带来了什么收益?

第一点是实现了SQL跨级群的联邦计算。在MaxCompute的集群上,有一个高性能的SQL计算引擎odpsSQL。由于现在湖/仓已经达到了一体化,因此SQL语句的表无论是落在数据湖还是MC数仓上,都是可以进行跨集群计算。但是在计算的过程还存在跨集群之间网络专线性的问题,所以我们会定期分析两个集群当中SQL作业的历史实行情况,然后分析出一些热表,将比较大的热表利用网络空闲时段同步到 MC数仓当中,让它们之间的网络传输最小化。

SQL跨集群联邦计算的能力也解决了之前一大痛点,使用odpsSQL以后可以高效分布式地同步数据。有时MaxCompute集群上的组件虽然具备了直接访问数据湖的能力,但有时一些高性能的API还是会直接读取MC数上的Table。所以如果还是想使用数仓里面的数据,可以使用odpsSQL直接高效同步数据。

第二点就是MaxCompute集群上的各种深度学习和机器学习的组建可以直接读写数据湖当中的数据,生成的模型文件还有各种Check Point也可以直接写入数据湖。后续模型上线的各种服务也是围绕数据湖展开,不再需要做数据中转,大幅提升深度学习的效率与业务效果。

 

2. Hadoop - 统一元数据

Hadoop集群的痛点和问题与湖/仓一体类似,它也存在内置存储、语言数据的问题,都是基于EMR的数据湖,导致Hadoop集群的存储形成了一个数据孤岛。

异构集群,统一计算 在微博机器学习平台的应用

如上图所示, 解决Hadoop集群问题的方法也是统一语言数据,将它的元数据和存储并入到之前已经开始建设统一的元数据系统当中,然后解决一些HDFS网络、权限、域名互通的问题。

经过统一的建设以后,在这两个集群之间实现了SQL跨集群的联邦计算。例如在SQL语句当中就一个联邦计算的简单示例,这条SQL无论放在A集群还是B集群上,都是可以进行计算。但是这种方法也存在集群之间的网络带宽问题,所以基于这个问题同样有冷热数据的分析,热数据定期同步的功能,也会分析出一些比较热的表,然后定期同步到对应的两个集群上,尽量实现本地化的计算,让计算更加的经济化。

由于网络带宽是这两个问题的最敏感点,因此会在此基础上增加一层关于网络专线最优使用的策略。这个策略本质上是通过作业以及云数据的分析,让数据跨专线的量最小化,使得作业都在本地执行。通过这样一个操作,加上Hadoop集群本身支持HiveSQL、SparkSQL,因此也实现了统一。

 

3. Kubernetes - 批流混跑

Kuberenetes集群主要是做一些实时计算和在线服务的作业。

异构集群,统一计算 在微博机器学习平台的应用

如上图所示,基于微博流量的特点,随着用户逐渐的减少,流量就会陷入一个低谷期。这时集群的CPU内存资源相对空闲。而反观做离线计算的Hadoop集群、YARN集群,当时间过了0点,开始计算昨天的各种定时作业、批处理计算数据,此时Hadoop集群异常繁忙。基于这个因素,想让两个集群之间的计算资源能够互相弹性使用,从而实现交通低谷,既缓解了Hadoop集群的计算压力,同时也能让Kuberenetes集群计算资源得到更加充分的使用,让计算资源得到一个最优化的使用。

异构集群,统一计算 在微博机器学习平台的应用

让SparkSQL和FlinkSQL调度到Kuberenetes上执行,选择SparkSQL和Flink SQL的原因是它们在支持SQL计算引擎的同时,开源对Kuberenetes的支持性也较高。

SparkSQL和FlinkSQL计算引擎有一个非常重要的问题,就是如何同时保证在线和实时作业以及批作业的稳定性?

首先,不能因为批作业调度而影响实时在线作业的稳定性。其次,如果保证实时和在线作业的高优先级,那么如何保证批作业的稳定性。

基于上述的问题,我们将实时作业和在线作业设置最高的优先级。接着在批作业上面,分了两层的优先级的设置。

针对SparkSQL作业,对它的Driver设置的是第二层优先级。相应的Flink SQL,针对它优先级设置它的Driver对应的是JobManager,Excutor对应的是Manager角色。 通过上述操作,将实时在线作业的优先级设置成了最高。

 

第二个解决方案是保证批作业的优先级。由于批作业的优先级不如实时作业高,调度中心是基于工作流重试的机制。如果出错的话,首先会在K8S集群上开启出错重试,如果出错的次数超过一个设定的值,会将它调度回YARN集训进行重试,最终完成正确执行。 通过上述二层机制,保证了批作业也同样的稳定。

第三个需特别强调,由于批作业调度过来之后是要做Shuffle,如果Shuffle数据量较大,会影响实时在线作业的稳定性。针对这个问题,我们独立部署了一套脱离于Kuberenetes集群的Shuffle Server,让Shuffle数据单独写到一套服务器上,上述操作同时解决了关于作业稳定性的问题。

以上策略不仅实现了计算资源的最优化使用,而且在这个集群上也稳定支持了SQL计算引擎,实现SQL计算引擎的统一。

 

4. EMR –SQL计算引擎优化

Hadoop集群统一的元数据中心是基于EMR的,它支持各种计算引擎,例如HiveSQL以及SparkSQL。由于需要它高效的性能,因此进行SQL计算引擎优化。

异构集群,统一计算 在微博机器学习平台的应用

异构集群,统一计算 在微博机器学习平台的应用

在算法工程做各种数据处理的时候最青睐Hive on MR,因为算法工程师比较专注于实现业务效果和业务逻辑,不关注计算引擎的底层原理优化。

在选型的原则上有如下三个原则:

第一是新的计算引擎一定要让用户做到无感知,用户的代码不需要做任何的改变,让用户感知不到发生了改变。

第二是迁移过程稳定且不需要太多的人工干预。刚使用Tez时并不清楚它的稳定性,所以在验证的过程中采取逐步放量灰度的方式。

第三个是执行,通过上面的图中可以看到Tez的执行时长,Tez的性能较MR有一个明显的提升。这个提升除了体现在时间有明显的缩短,同时在VcoreSecons、MemSecons计算资源的使用、Shuffle数据量上也有大幅减少,所以在EMR的云上Hadoop集群做了一个SQL计算引擎的优化,使其在统一计算平台的路上也优化了本身。

 

 

五、机器学习流程自动化

上图为图计算样例,首先进行的图训练生成一个Embedding,然后进行相似度计算做向量的召回,最终召回的向量会写入到物料库当中,供后续推荐场景召回使用。

通过对比之前与现在的流程操作,可以明显感受到在异构集群,统一计算平台的过程当中,给机器学习流程自动化做出了很大的正向推进作用。

上一篇:阿里云MNS Queue Rest API操作示例


下一篇:【转码系列之二】如何快速使用阿里云媒体转码服务和媒体库?