该文章来自于阿里巴巴技术协会(ATA)精选文章。
摘要:*目前,各种分布式实时计算系统已经在各大互联网公司得到了广泛应用。但是,这些实时系统的计算过程多不进行持久化,如果出现消息丢失等异常情况,通常很难定位问题出现的位置和具体原因,更无法做到主动发现消息丢失。对于广告营销等对消息准确性要求较高的业务场景来说,这种消息丢失的代价通常很高,即便很低的消息丢失率也会造成大量的财物损失。为此,阿里妈妈开发了一套面向分布式实时计算框架storm的实时跟踪校验系统——棱镜系统,棱镜系统实时记录每条消息在storm上的处理路径,主动发现消息丢失情况并报警。本文详细介绍了几位作者在开发棱镜中遇到的困难和相应的解决方案,相信对其他分布式实时计算系统的跟踪校验系统也有一定的借鉴意义。*
项目wiki
1.介绍:
1.1.棱镜项目的背景
如今的互联网应用,日均数据处理量越来越大,对计算实时性的要求也越来越高。为此,各种分布式实时计算系统层出不穷,比较有名的有Yahoo S4,storm,puma等。其中storm由于具备易恢复、可扩展、高容错等特性,目前被广泛应用在阿里妈妈营销系统的各条业务线上。
不过,虽然storm的acker机制巧妙地保证了每条消息都会被完整处理,但这一机制也有其局限:1)必须正确的调用ack和fail接口,对于逻辑复杂的应用,误调用在所难免;2)和storm相连接的外部更新系统不在acker的监控范围内;3)如果消息处理出错,无法确切知道在storm的哪个bolt出错。
同时,由于Storm计算过程未进行持久化,无法查看历史消息的处理路径。这样即便我们发现了某条消息有处理不完整的情况出现,也很难复现。
在棱镜系统上线之前,阿里妈妈的开发人员通常不能及时发现线上问题,有时甚至要晚于客户发现。而为了定位这些问题,往往又需要分别查询storm及storm访问的多个外部分布式系统的日志。由于这些日志散落在集群的各台机器上,查找起来非常费时,且有些日志受限于单机的存储能力,保存的时效有限。这样带来的问题一是问题处理不及时,引发客户投诉,客户满意度下降,造成财物损失;二是定位问题费时费力,定位成功率也不高;三是有些问题客户自己可能也无法发现,或者发现后没有及时反馈,导致bug一直在线上运行。
1.2.营销实时系统对棱镜的要求
为了解决1.1节中提到的问题,我们开发了棱镜系统,它实时地监测着storm系统的运行情况,记录下每条消息的处理路径,方便开发人员复现问题。同时准实时地对每条消息的处理路径进行校验,在发现问题时主动向开发和运维人员报警,并提供了一键消息重发工具,在异常出现时做到及时恢复。
棱镜系统在设计之初,就考虑到了营销系统的特殊要求,提出了以下设计目标:
1. 精确性,作为一个跟踪校验系统,如果本身不够精确,那么必定引起使用者的不信赖,同时,营销系统与收入、扣款直接相关的特性,也要求棱镜必须做到足够精确。
2. 实时性,实时系统的监测系统自然也需要满足实时性,不然在用户投诉后,再收到异常报警也没有任何意义。
3. 对应用尽可能透明。阿里妈妈的营销系统有数十、上百个拓扑,如果升级十分困难的话,也必然会阻碍棱镜的推广。对于棱镜,非常幸运的一点是,阿里妈妈的多条业务线共用一套拓扑框架,不同的业务在这套框架下,通过不同的配置得到适用于自己的业务逻辑,故而棱镜的改动可以集中在这个拓扑框架中,业务不需要关心。
2.棱镜系统架构和跟踪、校验原理:
2.1.棱镜系统总体架构:
图2.1 棱镜系统的总体架构
图2.1展示了棱镜系统的总体架构。其中Spout/Bolt[A-C]是要监控的某个拓扑进行流式处理的多个分布式组件。Dispatcher和updated是storm访问的外部更新系统,也在棱镜的监控范围内。ODPS(Open Data Processing Service)是阿里巴巴研发的海量离线数据处理平台,提供了批量结构化数据的存储和计算服务,棱镜的跟踪记录、主动校验计算、历史数据查询等都在ODPS平台上进行。从数据流的角度看,在storm等集群上收集到ODPS原始trace数据首先通过准实时merge将同属于一条消息的所有trace聚集到一起,再通过消息校验发现异常消息、触发报警,详细的数据流处理将在2.3节中介绍。
2.2.trace数据的打点方式
Spout/Bolt在接收和发射(emit)消息之后各打一条trace日志,每条日志以key-value形式记录下SeqID(每条消息唯一的主键,在各个bolt间不变,用于对齐),execTime(在storm中打点时的时刻,精确到毫秒),appName(拓扑名),clusterName(集群名称),boltName(spout或者bolt的名称), 机器ID以及一些其他自定义的个性化、多维度的信息。
举例来讲,对于如图2.1中的拓扑结构所要处理的某条消息来说,它会在Spout/Bolt[A-C]的入口、出口各打一条trace日志,共八条,这八条日志组成该条消息的处理路径。
2.3.消息跟踪、主动检验流程
如2.2节所述,每一条消息在storm中进行处理时,都会产生多条散布在各台机器上的trace数据。 接下来需要对这些Trace数据进行收集、对齐和完整性校验。整个流程包含以下几个步骤:
1、 使用分布式系统日志收集工具TimeTunnel将storm各节点上的trace日志收集到ODPS上,表中每一行是一条原始的Trace日志。
2、 对原始Trace数据进行解析、分列,并存储于一张split表。这张表在查询定位问题时使用,由于进行了分列,可以按照各种条件组成sql语句进行查询。
3、 将Trace数据按照消息主键(SeqID)进行对齐,我们称这个过程为merge流程。经过对齐,同一条消息的所有trace数据就被合并到一起,形成了一条消息的处理路径(tracing path)。
4、 对每条消息的处理路径进行完整性校验(见2.4),如果发现有处理不完整的情况,则触发报警。
以上步骤全部在ODPS以准实时的方式(小batch处理)进行处理,batch时间间隔可以进行调整。
2.4.主动校验的原理
所谓主动校验,就是检查每条消息的处理路径是否完整,是否走完了storm中预期要经过的所有bolt。同时,校验时还必须考虑到以下几种特殊情况
1、 可能有重传。在消息处理出现问题时,storm会通过重发消息,防止瞬时故障造成的影响。
2、 可能有消息分裂。在storm中,一个bolt可以emit多个消息,我们称这种情况为消息分裂(1分N)。
3、 可能有主动丢弃,这种情况下消息处理流程提前中止为正常情况。
为了解决上述三个问题,在棱镜的主动检验中,采用了如下算法:
1、 对消息处理路径上的trace进行排序,找到最后一次重传的所有trace日志,在主动校验中仅考虑最后一次重传。
2、 对最后一次重传中每种spout/bolt入口、出口的trace日志进行计数,查看是否有消息分裂的情况。如果有,则流程后方bolt的trace数也必须与之吻合。
3、 对于提前中止的情况,再检查3.3节中将要介绍的ErrorCode,如果ErrorCode也表示异常,才认为该消息处理路径真的出现异常。
2.5.Debug tracing的实现
利用2.1节中所述的架构,棱镜还提供了debug tracing或者称为即时tracing的功能。有些时候,开发同学需要在线上查看某条消息处理时的debug日志(默认关闭)。这在以前只能通过重启拓扑,打开debug日志选项,再到storm各节点上grep才能查看。
在棱镜中,由于已经有了日志收集,日志分列的架构,通过添加一些截获debug日志输出的代码,就简便地实现了这个功能。现在,开发同学只要在Storm的输入消息体里加入debug=true的选项,棱镜就会把这条消息的所有debug日志都收集到split表中,供开发同学方便地查询。
3.棱镜开发遇到的困难和相应的解决方案
3.1. merge时间段区间划分问题及两阶段merge的引入
图3.1 merge时间段划分问题
现在考虑2.3节中的merge流程。在棱镜的处理中,为保持实时性,对这一聚合过程采取了分小batch处理的方式。例如,本次处理0分到5分的数据,下次处理5分到10分的数据。但是,这种时间区间划分方式会导致某些同属一条消息的处理路径被切分到不同batch。图3.1给出了这一问题的图形化描述,其中,每个彩色长方形代表一条消息的处理路径,长方形上沿对应最早的spout的execTime时间,下沿对应最后一条trace日志的execTime时间。黑线代表批处理时间区间的某个边缘(如下边缘)的可能划分位置,从图中可以看出,不管这个时间边缘划在哪里,都会导致某条消息处理路径被一分为二。
为解决这一问题,我们将merge流程划分为两个阶段,分别称为merge1和merge2。在merge1阶段,以15分钟为一个时间区间进行一次聚合,这样部分位于区间边缘的消息路径(tracing path)将不可避免地被切分到两个区间。所以在merge1做消息聚合时,将本次处理区间内同一path的每条trace的execTime统一设置为该path所有trace中最小的execTime,这样一来path在时间线上的表示也就由“方块”压缩成了“直线”,因此在merge2划分时间区间时就不会再遇到时间区间边缘的划分问题。之后在merge2阶段,以merge1时间区间的中点为边缘再进行一次跨merge1分区的聚合操作,将merge1时被划分到不同分区的同一消息路径重新聚集到一起。
3.2.对重传的处理和时间戳不准促使UUID的引入
在2.3节介绍的主动校验算法中,需要对消息路径上的trace进行排序,找出最后一次重传的所有trace。理论上,这只需要按每条trace的execTime大小排序就可以了。但是,作为一个分布式系统,storm各台机器间的时间戳会有最多5毫秒的误差,这会打乱排序的顺序。
或者,既然我们知道消息流过spout/bolt的顺序,那按这个顺序进行排序不是也可以吗?这在正常情况下的确可以。不过重传会导致这种排序算法失灵。
为此,我们为每一次重传引入一个唯一的UUID,它在spout中通过函数System.nanotime()获得,可以精确到纳秒。这样,在多次重传间就可以利用execTime+UUID得到每次重传发生的相对位置,在一次重传内,再利用Bolt间的相对位置确定排序顺序。
3.3.有合理丢弃情况存在,棱镜引入ErrorCode
对于有些消息来说,没有走完整套消息处理流程是正常情况。为了不产生误报,我们在棱镜中定义了一套返回值ErrorCode,用来表征处理流程中的消息丢弃是否符合预期。只要相关业务线插件按照这套ErrorCode上报自己的处理结果,棱镜就可以提供精准的主动校验。
4.使用棱镜的性能消耗
图4.1列出了两个拓扑使用棱镜前后的性能对比。可以看出,棱镜造成的时延增加在16%以内,QPS下降约在5%。
图4.1 棱镜的性能消耗
5.棱镜的应用场景分析
目前,棱镜项目已经在阿里妈妈的搜索营销、展示营销、定向广告等多个业务场景的多个storm拓扑上得到了广泛的应用,取得了丰硕的成果。但是,棱镜的应用场景绝不仅仅局限于storm,任何具有分布式、消息流式处理(实时或非实时的)架构的系统都可以通过接入类棱镜的trace收集(timetunnel) + ODPS + 监控数据流pipeline的外部监控系统获得历史消息查询和消息处理实时监测的好处,令这些系统由黑盒变白盒,为发现问题、定位问题提供方便。
6. 项目成果
棱镜上线以后,取得了激动人心的效果。消息丢失的问题得到了全面修复,日均丢消息数量从之前的280余条降为现在的接近0条,相当于每年减少数百万的财物损失。调查、定位问题的效率也大幅提升了20倍以上,例如淘宝直通车线上的某个问题,之前定位共花费了3人日,现在定位类似问题只需要简单执行一条命令就可以快速复现。由于有了主动监测报警,新出现的消息丢失可以得到快速发现和立即解决,相关投诉减少90%以上,提升了客户满意度。历史消息处理路径查询也变得简单方便,图6.1和6.2展示了棱镜提供的console和WebUI两种查询工具的打印效果。利用这一工具,业务同学可以简便、快速地查询最近20天内storm所有消息的处理路径。
图6.1 console工具的打印效果
图6.2 webUi工具的打印效果
同时,利用这些沉淀下的数据,我们每天进行一次日消息处理量的全量统计,用邮件、网页报表的形式将统计结果展示给各业务方,及时发现消息处理量的异常波动。
7. 总结
本文介绍了棱镜这一分布式实时系统的跟踪校验系统,以及我们在开发中遇到的困难和相应解决方案。目前,棱镜系统已经在阿里妈妈内部得到了广泛应用,使用方在不修改代码的情况下就可以享受到棱镜带来的历史消息快速查询的好处。
棱镜的上线,让Storm由黑盒变成了白盒,为阿里妈妈的开发人员及时发现和定位问题提供了极大的帮助。通过小batch的批处理方案,棱镜做到了精确的准实时跟踪校验。据我们所知,这也是目前业界首创的解决方案。
所以,我们今天将棱镜的架构和工程中的难点介绍出来,希望可以为其他分布式流处理系统的跟踪校验方案提供一些借鉴。