说说阿里增量计算框架Galaxy

增量计算模型 (一)

背景

Galaxy是阿里数据平台事业部,实时计算组自研的增量计算框架。今年双十一,阿里直播大屏就是Galaxy支持和保障的重要业务之一,相信大家可能看过双十一之后网上一些介绍性的文章了,比如 阿里研发实时计算平台 每秒运算量将超千万 ,不过这篇文章面向非技术人员,最后的比喻也是有点醉。还这篇比较新的  阿里巴巴实时数据公共层助力双11媒体直播 。

本文我会介绍一些我认为可以公开出来说的galaxy技术上的特点,让技术人员对该计算框架有个更准确的认识。

计算模型

首先明确根本的一点, Galaxy是增量计算模型 ,不是"简单"的流计算,这点在业界是没有的。 增量计算是有状态的计算 。批量计算里,每一次的输出结果只与本次全量扫进来的数据有关,而且计算是幂等的。增量计算,每批计算结果,是由本批数据和历史批次结果计算出来的,即newValue = function(currentBatchValue, oldValue),然后本批计算出来的newValue会作为oldValue参与下一批数据的计算中。这个公式看起来与迭代计算相似,实质上,迭代计算是增量计算的一种。

下面说明流计算与Galaxy的关系。

Galaxy与Storm

网上对Galaxy有过了解的技术朋友,包括阿里内部的同事,可能会认为Galaxy只是Storm上的一层封装。在这里,我严肃地告诉大家,只要你动脑子想一想, 绝对不是的 ,不但不是封装,而且与Storm有本质区别。

Storm是流式计算,数据流进,经历拓扑计算,数据流出,与增量模型没有半毛钱关系。如果真要类比storm和galaxy,大家可以想一想Trident。trident是storm上的封装,暴露接口让用户可以操纵state,如此,批与批之间的计算结果的确可以通过state持久化起来了,并且可以参与下一批的计算,这看上去与galaxy做的相似。但还是有一个本质区别,trident的state不是它能掌控的,说白了,你state是额外的辅助存储,不是与我这个引擎挂钩的,而 Galaxy中的state是与整个计算框架关联起来的 ,这点太重要了,而且实现起来是很复杂的。

再说为什么现在的galaxy版本需要依赖storm。galaxy目前旧的版本还是跑在storm上的,借助storm的拓扑拉起,worker调度和消息传递。galaxy只是需要一个能够拉起worker,传递消息或者作RPC的引擎而已,storm是当时一个的临时选择。目前,storm引擎已经不再适合galaxy,无论是其拓扑构建和拉起的耗时,还是拓扑的不可变性,或是消息格式及序列化方面的性能等等种种问题,都不再适合galaxy,而仅仅适合于流计算。所以,将来galaxy的引擎,不会是这样。

Galaxy的未来

简单说几点Galaxy的想象空间。

现今Spark可以做的场景,Galaxy都是可以做的,而且Galaxy在性能上 至少是准实时 的。Spark的RDD代表了使用上的易用性和计算上的reuse数据。Galaxy同样有增量语义,表达能力同样强大的算子层和增量计算模型天然引入的数据reuse。

Galaxy的计算模型在业界是没有的,其计算场景覆盖了流式计算,迭代计算,还可以轻松愉悦地做BSP模型。

增量计算模型(二)

背景

在前一篇文章中,介绍到了Galaxy的增量计算性质,其state是框架内部管理的,以及与Storm的简单对比。这篇文章将讲述更多Galaxy增量模型的事情,并介绍这套增量模型之上实现的Galaxy SQL和Galaxy Operator,同时会从增量角度对比Spark Streaming。

Galaxy MRM增量与Spark Streaming

MRM模型全称为MapReduceMerge,比MapReduce做了一个Merge操作。merge阶段可与state交互,读写某个key的oldValue,并且这个merge接口还具备rollback语义。在流计算场景下,数据按时间或条数切成不同的批,批内可以做普遍意义下的MapReduce操作,批之间需要merge阶段做跨批聚合的计算。大家可以对比Spark Streaming的UpdateStateByKey操作,在一个DStream内,各个时间段内的RDD(即各批)可以通过这个接口更新一次任务内的state。而galaxy的merge本质上是一次add的过程,对应的rollback是一次delete的过程,从数据库的语义看,两个过程合起来相当于是update操作,而这俩过程都是根据一个primary key来做的,所以这件事情与spark streaming的updateStateByKey做的事情是一样的,但是细看的话,两者还是存在很大的差异。

galaxy的state暴露给计算task是线程级别独享的,spark streaming的state是任务内全局共享的。线程级别独享的优点,就在于同一批数据,按key shuffle之后来到不同的merge计算节点,各自不会阻塞各自的计算过程,而spark streaming的updateStateByKey操作会阻塞其他rdd的计算,虽然spark streaming能做到DStream内各个RDD并发执行,但是只要有state操作,最终还是落到了时间序列上的阻塞。本时间点StateRDD的计算需要依赖前一时间点父StateRDD的计算结果,而批内各个key对state操作是互相阻塞和影响的,所以着眼在这层barrier上的话,galaxy的merge过程更加精细,add和delete过程是分开的,批内的key是落到不同线程上计算而state是线程内独享的。

Galaxy有三种Model,分别是MapOnlyModel,MapReduceModel,MapReduceMergeModel。即,你可以使用M Model和MR Model做普通的流计算或小批计算,当需要跨批操作的时候就使用MRM Model。Model之间是随意组合串联的,接口相比MapReduce其实是相当灵活甚至过于灵活的,灵活的弊端是计算模型上带来复杂性。

Galaxy SQL

Galaxy SQL是一种StreamSQL,而且是目前业界没有的。从语法上Galaxy SQL贴近HiveSQL,但又有些流计算语义上(无限数据流)不能支持的语法,比如limit, order by。

Intel那边搞了一个Spark Streaming + Spark SQL的结合,叫StreamSQL。利用Spark SQL里的SchemaRDD,为Spark Streaming流进来的RDD带上了Schema元信息。借助Spark Streaming支持的操作,这种StreamSQL可以做滑窗效果的sql计算。但是真正跨批的增量语义(不仅仅是固定的window跨批计算),是支持不了的。Galaxy SQL可以做真正的增量流式SQL。

举个最简单的例子,

insert into t2 
  select t1.a as k, count(t1.b) as cnt from t1 group by t1.a;

select count(cnt) from t2 group by t2.cnt;

第一句sql中,根据t1的a字段分组,求了个count值。第二句sql中,t2表分组的字段变为t1表里count出来的cnt值。大家可以想象,在流计算场景里,第一次a求count出来的值可能是100,下一个时间点,同一个a的key,count出来的值就是200了,这时候,100这个cnt已经丢到t2表里计算出结果了,现在100已经更新到200了,200这个新的值的计算是简单的,但问题是如何把t2里之前100的计算结果撤销呢?

可以仔细想想,StreamSQL是做不了这样的sql的,本质上是因为spark streaming不支持这样的操作。Galaxy计算框架的merge阶段可以做rollback操作,回滚之前"错误"的状态,使得Galaxy SQL可以做分布式流式SQL。

Galaxy Operator

Galaxy Operator是Galaxy MRM编程接口之上的一层DAG封装,兼具易用性和表达能力。

算子层最终将映射成多个Galaxy的MRM Model,使用户可以更加关注计算逻辑,屏蔽较复杂的MRM Model,特别是merge阶段。

算子层相当于是物理执行计划,本身可以做节点合并、谓词下推等优化的工作,即物理执行计划的优化。从本质上,我认为类似Hive、Spark Catalyst里对执行计划的优化工作,在算子层这个DAG里都是可以做的。通过算子这一层,理论上任何DSL都是可以映射之后在Galaxy计算框架上运行的。

算子层提供五类正交的基础算子:map, reduce,merge,shuffle,union。五类基础算子可以互相组合,衍生成更高级的算子。

需要注意的是,reduce类的算子 ,针对的是 本批 内数据的聚合。增量语义下的reduce与批量语义下MapReduce中的reduce并不一样,增量语义下的reduce针对的是本批,MapReduce中的reduce对应跨批的数据,更加类似增量语义下的merge。merge类的算子 ,针对的是 跨批 的聚合操作。merge()对应的是MRM模型里的Merge phase,可与OldValue交互,是增量场景中的特性操作。通常用于实现count、sum等UDAF操作,也可以实现top、distinct、类join的操作。

union类的算子 ,针对的是多流合并的场景。union()操作是将多条流合并成一条流输出,要求各流的columns对齐且一致。mix()操作也是多流合并成一条,但内部标明了数据来自左流还是右流,各流的column可以不一致,后续可以衔接集合性的批内或跨批操作。mix()是 专门为集合性 操作而设计的接口。

功能上,算子层可以类比Spark RDD。Spark RDD 核心价值 有二:其一,在api层面,规避MapReduce模型的抽象和不舒适的生接口,提供多种transformations和actions,方便开发者理解和使用,即 easy to use ;其二,在计算层面,通过持久化RDD做到了批量计算过程中对中间数据的复用,使Spark诞生之初以适合迭代型计算的内存计算框架闻名,即 reuse data 。反观Galaxy算子层,一方面,算子层与Spark RDD一样,在api设计上具备FlumeJava的设计理念,兼具易用性和表达能力;另一方面,Galaxy之增量计算模型是 "有状态的计算" ,天然做到了实时数据各批之间"状态"的reuse(在merge phase)。

后续

之后有时间,希望可以介绍下Galaxy的任务模型、对于state的管理和容错等方面的内容。


上一篇:Arbor蔡志刚:企业需要防DDoS攻击能力


下一篇:c#dns解析示例