1 TransactionTopology例子
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 2);
builder.setBolt("partial-count", new BatchCount(), 3)
.shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
.globalGrouping("partial-count");
这是统计单词频率的一个例子,来自STORM官网,可以看到和普通Topology的架构是一样的,将包含业务逻辑的Spout和Bolt通过TransactionalTopologyBuilder组装为一个事务Topology,为各个节点设置并行度。
http://storm.apache.org/releases/current/Transactional-topologies.html
2 STORM事务框架模型 - TransactionTopology
http://storm.apache.org/releases/current/Transactional-topologies.html
Storm的计算模型是由Spout和Bolt组成的一个Topology,以Tuple为处理最小单元,每个Tuple是对消息的封装抽象。为了实现事务性,Storm将Spout和Bolt进行封装,对外提供了一组新的事务API。另外,为了提高处理效率,Storm将多个Tuple合并为一个处理批次(Batch),每个Batch就是一个独立的事务,这样Storm事务的最小处理单元是一个Batch,为每个Batch唯一分配一个transaction id。Storm接收到的Tuple流是有顺序的,因此即使按批次处理,Storm仍会保证各个Batch按先后顺序提交,但是Storm允许同时并行处理不同Batch。
从设计上将一个Batch分为两个阶段,
- Processing Phase,处理阶段。这个阶段不同的batch可以并行执行。
- Commit Phase,提交阶段。这个阶段不同batch按transaction id顺序提交,因此不同事务batch会竞争一个全局事务锁,Storm是通过zookeeper实现。
也就是说,这个分布式事务框架对哪些进行了抽象,在设计上为了兼顾通用性和性能,它做了哪些取舍呢?
对原始的Topology进行了封装,使用装饰模式增加了事务的逻辑。
图中蓝色线条为事务控制消息流向及对应流的名字,设置batch流、coord流及commit流;绿色虚线为数据消息流向,使用default 流。事务Topology中,系统只会跟踪控制消息,不会跟踪数据消息。
通过Storm系统的Ack框架,在收到事务Commit消息的Ack后,即认为成功处理了该事务。
1.那么有哪些类型的事务控制消息呢?
协调消息、事务提交消息。
2.协调消息结构是什么样的?
协调消息模式为<tx-id, tuple-cnt>
,通过emitDirect
方式告诉下游Task节点应该收到属于tx-id这个事务的多少条消息。
TrackingInfo
每个Bolt关注点是:
- 上游哪些Task给我发了消息,发了多少个;
- 我要给下游哪些Task发送消息,发送的消息都处理成功了吗
这个事务框架增加了一个事务控制层来达成这些目标。
2.1 TransactionSpoutCoordinator
这是一个spout,它输出数据类型?
- 输出到TRANSACTION_BATCH_STREAM_ID,Field为
[TransactionAttempt, coordinatorMeta, previousTransactionId]
coordinatorMeta类由实现ITransactionSpout
接口的用户决定,是存到zk中元数据的类型。比如PartitionedTransactionalSpoutExecutor
的元数据类型是Integer,表示分区数。
previousTransactionId
:上一个已提交事务的ID,就是一个BigInteger。TransactionSpoutExecutor这个Bolt会在execute方法中保存当前在处理的所有事务,有了这个事务ID,它就能将之前已提交事务都从内存中清理。
- 输出到流
TRANSACTION_COMMIT_STREAM_ID
,Field为[TransactionAttempt]
它的nextTuple做了哪些事?
该类会保存所有在处理的事务到名为_activeTx
的TreeMap中,保存当前正在处理的事务到_currTransaction
(类型为TransactionStatus)。
如果当前事务的状态是PROCESSED,则将其状态改为COMMITING,然后向下游的TransactionSpoutExecutor
通过TRANSACTION_COMMIT_STREAM_ID
这条流发送一条消息 [TransactionAttempt],告知它这个事务已经处理完成。TransactionSpoutExecutor
收到后则调用commit方法提交该事务,并调用ack方法通知TransactionSpoutCoordinator
消息已收到。
ack & fail方法做了啥
收到TransactionalSpoutBatchExecutor
对它发出消息的ack后,执行ack方法。如果currTransaction事务当前是PROCESSING状态,则改为PROCESSED;如果是COMMITING,则表明事务处理完成,将_currTransaction事务从_activeTx中清除,并将zk中事务号小于等于currTransaction的元数据都清除。
要是fail方法被调用,则调用sync方法重传消息。
https://blog.csdn.net/guicaizhou/article/details/79277013
2.2 TransactionalSpoutBatchExecutor
这是一个Bolt,output是什么
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_spout.declareOutputFields(declarer);
}
它的输出由用户定义,与实现ITransactionSpout接口的_spout相同。
它的input是什么
TransactionSpoutCoordinator
的输出就是它的输入。
它的execute方法在做什么
获取到TransactionSpoutCoordinator
发送的消息,有两种情况。
- 消息来自流
TRANSACTION_COMMIT_STREAM_ID
,则判断收到的消息是否属于它记录的当前事务,如果是就提交这个事务(调用commit)并将这个事务从它的_activeTransactions中清除。 - 消息来自流
TRANSACTION_BATCH_STREAM_ID
,则发送下一批次的消息,并将对应的事务id及TransactionAttempt
存入它的_activeTransactions
中,再调用ack方法告诉上游的spout/coordinator它收到了消息。然后把该事务之前的所有事务从activeTransactions中清除。
2.3 CoordinatedBolt
这种Bolt会代理用户定义Topology中的所有Spout和Bolt组件及TransactionalSpoutBatchExecutor组件。它通过TrackingInfo这个结构来统计:
public static class TrackingInfo {
//上游发送给当前Bolt消息的节点数
int reportCount = 0;
//通过协调消息流告知Bolt应该收到的tuple数
int expectedTupleCount = 0;
int receivedTuples = 0; //Bolt实际收到的tuple数
boolean failed = false; //该事务尝试是否已经失败
//保存本Bolt发送给下游task的tuple数
Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
//实现了ICommitter接口Bolt,收到协调Spout消息后置为true
boolean receivedId = false;
boolean finished = false; //表示该事务处理是否结束
//保存从协调spout(提交消息)和协调Bolt(协调消息)收到的消息
List<Tuple> ackTuples = new ArrayList<>();
}
事务Topology中,只通过ACK框架跟踪控制消息,不跟踪普通数据消息。
这是一个Bolt,它的output是什么
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_delegate.declareOutputFields(declarer);
declarer.declareStream(Constants.COORDINATED_STREAM_ID,
true, new Fields("id", "count"));
}
有两种输出,一种是用户自定义的;第二种消息是输出消息到流COORDINATED_STREAM_ID
,Field为[消息id,发送的tuple数]
。
这两种消息都只会在CoordinatedBolt
间流动,并且第二种消息是发送给指定task的direct
消息。
它的input是什么
它的execute方法的input可以来自三种情况:
1、来自TransactionSpoutCoordinator
的commit消息,tuple类型为TupleType.ID。只有CoordinatedBolt
代理的Bolt实现了ICommiter接口时才会处理这种消息,并且是通过TRANSACTION_COMMIT_STREAM_ID
流发来。
2、来自CoordinatedBolt间发送的消息,tuple类型为TupleType.COORD,通过COORDINATED_STREAM_ID
流发来。
3、来自default
流及TRANSACTION_BATCH_STREAM_ID
流的消息,tuple类型为TupleType.REGULAR。default流消息肯定是从用户Topology的Spout或者Bolt发来。另外一种流的消息是来自TransactionSpoutCoordinator
,这只有代理了TransactionalSpoutBatchExecutor
的CoordinatedBolt才会收到。
execute方法在做什么
对于普通业务数据消息,直接调用被代理Bolt的execute方法。
对于控制消息,会通过TrackingInfo
来统计收到的tuple数、发送的tuple数等数据来保证消息的一致性。
2.4 说明
这个类在当前最新的Storm2.x里面已经去掉,用Trident替代了它的能力,并增加了一些特性,目的是方便用户使用。但我仍然去研究它的机制是因为Trident在事务这块的思想及主要代码还是复用了TransactionTopology,因此是一个循序渐进的目的。