storm源码分析研究(十一)

2021SC@SDUSC

Trident的Bolt节点分析

2021SC@SDUSC

SubTopologyBolt类型为Trident中运行的基本单位,但它并不是真正的Bolt节点,Trident会利用TridentBoltExecutor对SubTopologyBolt进行接口适配。

TridentBoltExecutor继承自IRichBolt接口,是Trident中真正运行的Bolt节点。它提供了类似于协调Bolt ( CoordinatedBolt )节点的功能,通过发送协调消息来对各个节点进行同步。

SubTopologyBolt主要用于对TridentProcessor的执行进行抽象。本篇文章将讨论SubTopologyBolt和TridentBoltExecutor的实现。

SubtopologyBolt.java

类的定义为:

public class SubtopologyBolt implements ITridentBatchBolt {
    private static final long serialVersionUID = 1475508603138688412L;
    @SuppressWarnings("rawtypes")
    final DirectedGraph<Node, IndexedEdge> graph;
    final Set<Node> nodes;
    final Map<String, InitialReceiver> roots = new HashMap<>();
    final Map<Node, Factory> outputFactories = new HashMap<>();
    final Map<String, List<TridentProcessor>> myTopologicallyOrdered = new HashMap<>();
    final Map<Node, String> batchGroups;


graph:整个Topology所对应的有向图。
nodes:该Bolt中所包含的处理节点。_nodesS_graph节点的子集。
roots:每种类型的输人流都会对应一个lnitalReceiver对象,用于表示如何处理该流的消息。
outputFactories:每个处理节点都会对应一个输出的工厂。 myTopologicallyOrdered:它的键为节点组序号,值为节点组所对应的TridentProcessor。
batchGroups:该变量保存了反向的索引,用来表示每个节点属于哪一个节点组。
BatchGroup对应于graph中的一个最大连通子图。

主要方法:

    public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector batchCollector) {
        int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
        for (Node n : nodes) {
            if (n.stateInfo != null) {
                State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
                context.setTaskData(n.stateInfo.id, s);
            }
        }
        DirectedSubgraph<Node, ?> subgraph = new DirectedSubgraph<>(graph, nodes, null);
        TopologicalOrderIterator<Node, ?> it = new TopologicalOrderIterator<>(subgraph);
        int stateIndex = 0;
        while (it.hasNext()) {
            Node n = it.next();
            if (n instanceof ProcessorNode) {
                ProcessorNode pn = (ProcessorNode) n;
                String batchGroup = batchGroups.get(n);
                if (!myTopologicallyOrdered.containsKey(batchGroup)) {
                    myTopologicallyOrdered.put(batchGroup, new ArrayList<>());
                }
                myTopologicallyOrdered.get(batchGroup).add(pn.processor);
                List<String> parentStreams = new ArrayList<>();
                List<Factory> parentFactories = new ArrayList<>();
                for (Node p : TridentUtils.getParents(graph, n)) {
                    parentStreams.add(p.streamId);
                    if (nodes.contains(p)) {
                        parentFactories.add(outputFactories.get(p));
                    } else {
                        if (!roots.containsKey(p.streamId)) {
                            roots.put(p.streamId, new InitialReceiver(p.streamId, getSourceOutputFields(context, p.streamId)));
                        }
                        roots.get(p.streamId).addReceiver(pn.processor);
                        parentFactories.add(roots.get(p.streamId).getOutputFactory());
                    }
                }
                List<TupleReceiver> targets = new ArrayList<>();
                boolean outgoingNode = false;
                for (Node cn : TridentUtils.getChildren(graph, n)) {
                    if (nodes.contains(cn)) {
                        targets.add(((ProcessorNode) cn).processor);
                    } else {
                        outgoingNode = true;
                    }
                }
                if (outgoingNode) {
                    targets.add(new BridgeReceiver(batchCollector));
                }

                TridentContext triContext = new TridentContext(
                    pn.selfOutFields,
                    parentFactories,
                    parentStreams,
                    targets,
                    pn.streamId,
                    stateIndex,
                    batchCollector
                );
                pn.processor.prepare(conf, context, triContext);
                outputFactories.put(n, pn.processor.getOutputFactory());
            }
            stateIndex++;
        }
    }

for (Node n : nodes):
在此循环中判断节点的statelnfo是否为空,并对存储的State对象进行初始化。初始化过后的State对象被存储于TopologyContext的taskData中,并以statelnfo.id为键。statelnfo.id是一个以串 “ state” 为前缀的在Topology中唯一的字符串。
subgraph:
根据SubTopologyBolt中包含的节点获得一个子图。
it:
对子图进行拓扑排序,TopologicalOrderlterator类型的it变量用来按照拓扑排序的顺序遍历子图。
if (n instanceof ProcessorNode):
SubTopologyBolt只对处理节点进行操作。处理节点中包含了一个TridentProcessor。Spout节点和分区节点则不在SubTopologyBolt的处理范围之内。
pn.processor.prepare(conf, context, triContext:
调用该TridentProcessor的prepare方法,它将新产生的TridentContext对象作为构造函数的一个参数传入。
outputFactories.put(n, pn.processor.getOutputFactory()):
将该TridentProcessor所对应的输出添加到SubTopologyBolt的输出中。于是该输出便可被其他的SubTopologyBolt作为输入了。
statelndex变量:
用于唯一地标识SubTopologyBolt中的每一个节点。

public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = roots.get(sourceStream);
        if (ir == null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        ir.receive((ProcessorContext) batchInfo.state, tuple);
    }

首先,根据输人消息的流号,在roots中找到对应的InitialReceiver对象,并调用其receive方法。
所有等待该流消息的TridentProcessor的execute方法均会被调用。
在某个TridentProcessor的execute方法中,下游TridentProcessor的execute方法也会被依次调用到,于是构成了一个调用链,直到SubTopologyBolt完成对该消息的处理后结束。

public void finishBatch(BatchInfo batchInfo) {
        for (TridentProcessor p : myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[nodes.size()]);
        for (TridentProcessor p : myTopologicallyOrdered.get(batchGroup)) {
            p.startBatch(ret);
        }
        return ret;
    }
public void declareOutputFields(OutputFieldsDeclarer declarer) {
        for (Node n : nodes) {
            declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
        }
    }

在initBatchState方法中,会对ProcessorContext的数据进行初始化,然后返回ProcessorContext对象。Trident中的聚集器均要基于ProcessorContext中state所存储的数据来实现。
declareOutputFields方法会对SubTopologyBolt中每一个节点的输出进行声明,它将$batchid作为第1列。可以看出SubTopologyBolt虽然作为一个整体而存在,可其中每一个节点的输出均可能成为最终的输出。

上一篇:Storm入门


下一篇:storm源码分析研究(九)