STORM事务框架模型 - TransactionTopology

1 TransactionTopology例子

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 2);
builder.setBolt("partial-count", new BatchCount(), 3)
        .shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
        .globalGrouping("partial-count");

这是统计单词频率的一个例子,来自STORM官网,可以看到和普通Topology的架构是一样的,将包含业务逻辑的Spout和Bolt通过TransactionalTopologyBuilder组装为一个事务Topology,为各个节点设置并行度。

http://storm.apache.org/releases/current/Transactional-topologies.html

2 STORM事务框架模型 - TransactionTopology

http://storm.apache.org/releases/current/Transactional-topologies.html

Storm的计算模型是由Spout和Bolt组成的一个Topology,以Tuple为处理最小单元,每个Tuple是对消息的封装抽象。为了实现事务性,Storm将Spout和Bolt进行封装,对外提供了一组新的事务API。另外,为了提高处理效率,Storm将多个Tuple合并为一个处理批次(Batch),每个Batch就是一个独立的事务,这样Storm事务的最小处理单元是一个Batch,为每个Batch唯一分配一个transaction id。Storm接收到的Tuple流是有顺序的,因此即使按批次处理,Storm仍会保证各个Batch按先后顺序提交,但是Storm允许同时并行处理不同Batch。

从设计上将一个Batch分为两个阶段,

  • Processing Phase,处理阶段。这个阶段不同的batch可以并行执行。
  • Commit Phase,提交阶段。这个阶段不同batch按transaction id顺序提交,因此不同事务batch会竞争一个全局事务锁,Storm是通过zookeeper实现。

STORM事务框架模型 - TransactionTopology
也就是说,这个分布式事务框架对哪些进行了抽象,在设计上为了兼顾通用性和性能,它做了哪些取舍呢?

对原始的Topology进行了封装,使用装饰模式增加了事务的逻辑。
图中蓝色线条为事务控制消息流向及对应流的名字,设置batch流、coord流及commit流;绿色虚线为数据消息流向,使用default 流。事务Topology中,系统只会跟踪控制消息,不会跟踪数据消息。

通过Storm系统的Ack框架,在收到事务Commit消息的Ack后,即认为成功处理了该事务。

1.那么有哪些类型的事务控制消息呢?
协调消息、事务提交消息。

2.协调消息结构是什么样的?
协调消息模式为<tx-id, tuple-cnt>,通过emitDirect方式告诉下游Task节点应该收到属于tx-id这个事务的多少条消息。

TrackingInfo

每个Bolt关注点是:

  1. 上游哪些Task给我发了消息,发了多少个;
  2. 我要给下游哪些Task发送消息,发送的消息都处理成功了吗

这个事务框架增加了一个事务控制层来达成这些目标。

2.1 TransactionSpoutCoordinator

这是一个spout,它输出数据类型?

  1. 输出到TRANSACTION_BATCH_STREAM_ID,Field为[TransactionAttempt, coordinatorMeta, previousTransactionId]
    coordinatorMeta类由实现ITransactionSpout接口的用户决定,是存到zk中元数据的类型。比如PartitionedTransactionalSpoutExecutor的元数据类型是Integer,表示分区数。

previousTransactionId:上一个已提交事务的ID,就是一个BigInteger。TransactionSpoutExecutor这个Bolt会在execute方法中保存当前在处理的所有事务,有了这个事务ID,它就能将之前已提交事务都从内存中清理。

  1. 输出到流TRANSACTION_COMMIT_STREAM_ID,Field为[TransactionAttempt]

它的nextTuple做了哪些事?

该类会保存所有在处理的事务到名为_activeTx的TreeMap中,保存当前正在处理的事务到_currTransaction(类型为TransactionStatus)。

如果当前事务的状态是PROCESSED,则将其状态改为COMMITING,然后向下游的TransactionSpoutExecutor通过TRANSACTION_COMMIT_STREAM_ID这条流发送一条消息 [TransactionAttempt],告知它这个事务已经处理完成。TransactionSpoutExecutor收到后则调用commit方法提交该事务,并调用ack方法通知TransactionSpoutCoordinator消息已收到。

ack & fail方法做了啥

收到TransactionalSpoutBatchExecutor对它发出消息的ack后,执行ack方法。如果currTransaction事务当前是PROCESSING状态,则改为PROCESSED;如果是COMMITING,则表明事务处理完成,将_currTransaction事务从_activeTx中清除,并将zk中事务号小于等于currTransaction的元数据都清除。

要是fail方法被调用,则调用sync方法重传消息。

https://blog.csdn.net/guicaizhou/article/details/79277013

2.2 TransactionalSpoutBatchExecutor

这是一个Bolt,output是什么

 public void declareOutputFields(OutputFieldsDeclarer declarer) {
    _spout.declareOutputFields(declarer);
  }

它的输出由用户定义,与实现ITransactionSpout接口的_spout相同。

它的input是什么

TransactionSpoutCoordinator的输出就是它的输入。

它的execute方法在做什么

获取到TransactionSpoutCoordinator发送的消息,有两种情况。

  1. 消息来自流TRANSACTION_COMMIT_STREAM_ID,则判断收到的消息是否属于它记录的当前事务,如果是就提交这个事务(调用commit)并将这个事务从它的_activeTransactions中清除。
  2. 消息来自流TRANSACTION_BATCH_STREAM_ID,则发送下一批次的消息,并将对应的事务id及TransactionAttempt存入它的_activeTransactions中,再调用ack方法告诉上游的spout/coordinator它收到了消息。然后把该事务之前的所有事务从activeTransactions中清除。

2.3 CoordinatedBolt

这种Bolt会代理用户定义Topology中的所有Spout和Bolt组件及TransactionalSpoutBatchExecutor组件。它通过TrackingInfo这个结构来统计:

 public static class TrackingInfo {
   //上游发送给当前Bolt消息的节点数
   int reportCount = 0; 
   //通过协调消息流告知Bolt应该收到的tuple数
   int expectedTupleCount = 0; 
   int receivedTuples = 0; //Bolt实际收到的tuple数
   boolean failed = false; //该事务尝试是否已经失败
    //保存本Bolt发送给下游task的tuple数
   Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
    //实现了ICommitter接口Bolt,收到协调Spout消息后置为true
   boolean receivedId = false;
   boolean finished = false; //表示该事务处理是否结束
   //保存从协调spout(提交消息)和协调Bolt(协调消息)收到的消息
   List<Tuple> ackTuples = new ArrayList<>(); 
}

事务Topology中,只通过ACK框架跟踪控制消息,不跟踪普通数据消息。

这是一个Bolt,它的output是什么

public void declareOutputFields(OutputFieldsDeclarer declarer) {
   _delegate.declareOutputFields(declarer);
   declarer.declareStream(Constants.COORDINATED_STREAM_ID, 
               true, new Fields("id", "count"));
}

有两种输出,一种是用户自定义的;第二种消息是输出消息到流COORDINATED_STREAM_ID,Field为[消息id,发送的tuple数]

这两种消息都只会在CoordinatedBolt间流动,并且第二种消息是发送给指定task的direct消息。

它的input是什么

它的execute方法的input可以来自三种情况:
1、来自TransactionSpoutCoordinator的commit消息,tuple类型为TupleType.ID。只有CoordinatedBolt代理的Bolt实现了ICommiter接口时才会处理这种消息,并且是通过TRANSACTION_COMMIT_STREAM_ID流发来。
2、来自CoordinatedBolt间发送的消息,tuple类型为TupleType.COORD,通过COORDINATED_STREAM_ID流发来。
3、来自default流及TRANSACTION_BATCH_STREAM_ID流的消息,tuple类型为TupleType.REGULAR。default流消息肯定是从用户Topology的Spout或者Bolt发来。另外一种流的消息是来自TransactionSpoutCoordinator,这只有代理了TransactionalSpoutBatchExecutor的CoordinatedBolt才会收到。

execute方法在做什么

对于普通业务数据消息,直接调用被代理Bolt的execute方法。
对于控制消息,会通过TrackingInfo来统计收到的tuple数、发送的tuple数等数据来保证消息的一致性。

2.4 说明

这个类在当前最新的Storm2.x里面已经去掉,用Trident替代了它的能力,并增加了一些特性,目的是方便用户使用。但我仍然去研究它的机制是因为Trident在事务这块的思想及主要代码还是复用了TransactionTopology,因此是一个循序渐进的目的。

上一篇:测试开发进阶:一文教你从0到1搞懂大数据测试!


下一篇:常见的中间件