Trident Topology执行过程分析
TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射。关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多。
从TridentTopology到vanilla topology(普通的topology)由三个层次组成:
- 面向最终用户的概念stream, operation
- 利用planner将tridenttopology转换成vanilla topology
- 执行vanilla topology
本文尝试TridentTopology是如何先一步步转换成普通的storm Topology(即vanila topology), 转换后的topology的执行中有哪些区别?
概述
从TridentTopology到基本的Topology有三层,下图给出一个全局的视图。
创建TridentTopology
下面的代码摘自StormStarter中的TridentWordCount.java
TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(16); return topology.build();
上述代码的newStream一行,分两大部分,一是使用newStream来创建一个stream对象,然后针对该Stream进行各种操作,each/shuffle/persistentAggregate等就是各种operation.
用户在使用TridentTopology的时候,只需要熟悉Stream和TridentTopology中的API函数即可。
转换TridentTopology为Vanilla Topology
上一节创建了Stream,但是如何将其与原有的Spout及Bolt联系起来呢?问题的关键就在TridentTopology::build函数和TridentTopologyBuilder::buildTopology
TridentTopology::build
newStream及其后的函数调用创建了一个含有三大类节点的List,利用该List创建了一个有向非循环图(DAG)。这三类节点分别是operation, partition, spout,在build函数将节点分类分别加入到boltNodes或spoutNodes,注意此处的spout或bolt不能等同于普通的spout和bolt.
TridentTopologyBuilder::buildTopology
利用在build函数中创建的boltNodes,spoutNodes及生成的graph来创建vanilla topology所需要的bolt及spout.
在buildTopology中会看到类似的代码片段。
builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
for(String b: c.committerBatches) { specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); } BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
最终生成的普通Topology,与普通Topology中的Spout相对应的是MasterBatchCoordinator,而在创建TridentTopology使用的spout则成了Bolt,使用于Stream上的各种Operation也存在于多个普通Bolt中。
TridentTopology的执行
TridentTopology被转换为普通的Topology(vanilla Topology)之后提交到nimbus,它的具体执行过程有什么不同呢?
主要有几点:
- MasterBatchCoordinator通过Batch_stream_id来发送通知给TridentSpoutExecutor
- TridentSpoutExecutor收到通知发送成批的tuple给下一跳的Bolt
- 下一跳的Bolt收到tuple之后,使用TridentBoltExecutor来进行处理
- TridentBoltExecutor调用SubtopologyBolt::execute
- InitialReceiver::execute被调用
- TridentProcessor::execute被调用
MasterBatchCoordinator收到ack之后,会发送success消息给Spout
MasterBatchCoordinator在commit的时候,会发送commit消息给Spout,让Spout将缓存的消息删除
trident topology可靠性分析
本文详细分析TridentTopology的可靠性实现, TridentTopology通过transactional spout与transactional state相结合,能够做到tuple“只被处理一次,不多也不少”。也就是做到事务性处理exactly-once,要么成功,要么失败。
而一般的storm topology是无法保证eactly-once的处理的,它们要么是at-least-once(至少被处理一次,有可能被处理多次);要么是at-most-once(最多被处理一次,这样就存在遗漏的可能).
TridentTopology在设计中借鉴和保留了目前已经过期的transactional topology的设计思想。
Storm Topology的ack机制
在进行TridentTopology的可靠性分析之前,我们先回顾一下在storm topology中的ack机制。ack bolt是在提交到storm cluster中,由系统自动产生的,一般来说一个topology只有一个ack bolt(当然可以通过配置参数指定多个)。
当bolt处理并下发完tuple给下一跳的bolt时,会发送一个ack给ack bolt。ack bolt通过简单的异或原理(即同一个数与自己异或结果为零)来判定从spout发出的某一个bolt是否已经被完全处理完毕。如果结果为真,ack bolt发送消息给spout,spout中的ack函数被调用并执行。如果超时,则发送fail消息给spout,spout中的fail函数被调用并执行,spout中的ack和fail的处理逻辑由用户自行填写。
如在github上的kerstel spout就能做到只有当某一个tuple被成功处理之后,它才会从缓存中移除,否则继续放入到处理队列再次进行处理。
TridentTopology的可靠性机制
在“走读之6”一文中分析了一个tridenttopology是如何转换成storm topology的,我想用上面这幅图再次阐述一下转变后的结果。
- 一个tridenttopoloy会至少引入一个MasterBatchCoordinator,这个MBC就类似于storm topology中的spout
- newStream时使用的入参spout会裂变成两个bolt,一是TridentSpoutCoordinator,另一个是TridentSpoutExecutor
- 针对stream的各种操作则被分散到各个Bolt中,它们的执行上下文是TridentBoltExecutor
可以看出使用TridentTopology Api进行操作时,所有的东西其实都运行在bolt context中,而真正的spout是在调用TridentTopologyBuilder.buildTopology()的时候被添加的。
- MasterBatchCoordinator使用batch_stream发送一个类似于seeder tuple的东西给tridentspoutcoordinator,tridentspoutcoordinator将该信号继续下发给TridentSpoutExecutor, TridentSpout是如何一步步被调用到的呢。
- TridentBoltExecutor::execute
- TridentSpoutExecutor::execute
- BatchSpoutExecutor::execute
- ITridentSpout::emitBatch
- BatchSpoutExecutor::execute
- TridentSpoutExecutor::execute
- TridentBoltExecutor::execute
emitBatch是产生真正需要被处理的tuple的,这些tuple会被各个Operation所在的bolt所接收。它们的调用顺序是
- TridentBoltExecutor::execute
- SubtopologyBolt::execute
- InitialReceiver::receive
- TridentProcessor::execute
- InitialReceiver::receive
- SubtopologyBolt::execute
处理结束的判断依据
在TridentSpout中是如何判断所有的tuple都已经被处理的呢。
- 在每跳中认为自己处理完毕的时候,它都会告诉下一跳,即下游,我给你发送了多少tuple,如果下游将上游发送过来的确认消息与自身确实已经处理的消息比对一致的话,则认为处理都完成,于是发送ack.
- 问题的关键变成每一个bolt是如何判断自己已经处理完毕的呢,请看步骤3
- 总有一个bolt是没有上游的,即TridentSpoutExecutor,它只会收到启动指令,但不接收真正的业务数据,于是它会告诉下一跳,我发了多少tuple给你。
STREAM
在MasterBatchCoordinator中定义了三种不同的stream,这三种stream分别是
- BATCH_STREAM
- COMMIT_STREAM
- SUCCESS_STREAM
这些stream分别在什么时候被使用呢,下图给出一个大概的时序
简要说明:
- masterbatchcoordinator通过batch_stream发送seeder tuple给tridentspoutcoordinator
- tridentspoutcoordinator给tridentspoutexecutor继续传递该指令
- TridentSpoutExecutor在收到启动指令后,调用ITridentSpout接口的实现类进行emitBatch
- TridentSpoutExecutor在发送完一批batch后,finishBatch被调用,通过emitDirect会给下一跳通过coord_stream发送trackedinfo,即我已经发送了多少消息给你
- TridentSpoutExecutor紧接着还会给ack bolt发送ack消息,ack bolt将其传达到MasterBatchCoordinator
- MasterBatchCoordinator在收到第一个ack后,将状态置为processed
- 当MasterBatchCoordinator再次收到ack后,会将状态转为committing,同时通过commit_stream发送tuple给TridentSpoutExecutor
- 收到commit_stream上传来的tuple后,TridentSpoutExecutor会调用ITridentSpout中的emmitter, emmitter::commit()被执行,TridentSpoutExecutor会再次ack收到tuple
- MasterBatchCoordinator在收到这个tuple之后,会认为针对某一个seeder tuple的处理已经完全实现,于是通过SUCCESS_STREM告知TridentSpoutCoordinator,所有的活都已经都完成了,收工。
- 收到Success_stream上传来的信号后,ITridentSpout中的内嵌子类Emmit和Coordinator中相应的success方法会被调用执行。
注意:
- 为了描述方便,将TridentTopology进行了简化,认为其在转换成真正的storm topology时,只有一个TridentProcessor所在的bolt。真实的情况可能比这复杂,但消息的传递路径还是差不多的。
- 注意在TridentTopology中ack会被多次反复调用,这不同于普通的storm topology
状态机
在MasterBatchCoordinator中,针对每一个seeder tuple,其状态机如下图所示。注意这些状态是会被保存到zookeeper server中的,使用的api定义在TransactionalState中。
总结
通过上面的分析可以看出,TridentTopology实现了一个比较好的框架,但真正要做到exactly-once的处理,还需要用户自己去实现ITridentSpout中的两个重要内嵌类,Emmitter和Coordinator。
具体如何实现该接口,可以查看storm-core/src/jvm/storm/trident/testing目录下的FixedBatchSpout.java和FeederCommitterBatchSpout.java