2021SC@SDUSC
Trident的Bolt节点分析
2021SC@SDUSC
SubTopologyBolt类型为Trident中运行的基本单位,但它并不是真正的Bolt节点,Trident会利用TridentBoltExecutor对SubTopologyBolt进行接口适配。
TridentBoltExecutor继承自IRichBolt接口,是Trident中真正运行的Bolt节点。它提供了类似于协调Bolt ( CoordinatedBolt )节点的功能,通过发送协调消息来对各个节点进行同步。
SubTopologyBolt主要用于对TridentProcessor的执行进行抽象。本篇文章将讨论SubTopologyBolt和TridentBoltExecutor的实现。
SubtopologyBolt.java
类的定义为:
public class SubtopologyBolt implements ITridentBatchBolt {
private static final long serialVersionUID = 1475508603138688412L;
@SuppressWarnings("rawtypes")
final DirectedGraph<Node, IndexedEdge> graph;
final Set<Node> nodes;
final Map<String, InitialReceiver> roots = new HashMap<>();
final Map<Node, Factory> outputFactories = new HashMap<>();
final Map<String, List<TridentProcessor>> myTopologicallyOrdered = new HashMap<>();
final Map<Node, String> batchGroups;
graph:整个Topology所对应的有向图。
nodes:该Bolt中所包含的处理节点。_nodesS_graph节点的子集。
roots:每种类型的输人流都会对应一个lnitalReceiver对象,用于表示如何处理该流的消息。
outputFactories:每个处理节点都会对应一个输出的工厂。 myTopologicallyOrdered:它的键为节点组序号,值为节点组所对应的TridentProcessor。
batchGroups:该变量保存了反向的索引,用来表示每个节点属于哪一个节点组。
BatchGroup对应于graph中的一个最大连通子图。
主要方法:
public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector batchCollector) {
int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
for (Node n : nodes) {
if (n.stateInfo != null) {
State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
context.setTaskData(n.stateInfo.id, s);
}
}
DirectedSubgraph<Node, ?> subgraph = new DirectedSubgraph<>(graph, nodes, null);
TopologicalOrderIterator<Node, ?> it = new TopologicalOrderIterator<>(subgraph);
int stateIndex = 0;
while (it.hasNext()) {
Node n = it.next();
if (n instanceof ProcessorNode) {
ProcessorNode pn = (ProcessorNode) n;
String batchGroup = batchGroups.get(n);
if (!myTopologicallyOrdered.containsKey(batchGroup)) {
myTopologicallyOrdered.put(batchGroup, new ArrayList<>());
}
myTopologicallyOrdered.get(batchGroup).add(pn.processor);
List<String> parentStreams = new ArrayList<>();
List<Factory> parentFactories = new ArrayList<>();
for (Node p : TridentUtils.getParents(graph, n)) {
parentStreams.add(p.streamId);
if (nodes.contains(p)) {
parentFactories.add(outputFactories.get(p));
} else {
if (!roots.containsKey(p.streamId)) {
roots.put(p.streamId, new InitialReceiver(p.streamId, getSourceOutputFields(context, p.streamId)));
}
roots.get(p.streamId).addReceiver(pn.processor);
parentFactories.add(roots.get(p.streamId).getOutputFactory());
}
}
List<TupleReceiver> targets = new ArrayList<>();
boolean outgoingNode = false;
for (Node cn : TridentUtils.getChildren(graph, n)) {
if (nodes.contains(cn)) {
targets.add(((ProcessorNode) cn).processor);
} else {
outgoingNode = true;
}
}
if (outgoingNode) {
targets.add(new BridgeReceiver(batchCollector));
}
TridentContext triContext = new TridentContext(
pn.selfOutFields,
parentFactories,
parentStreams,
targets,
pn.streamId,
stateIndex,
batchCollector
);
pn.processor.prepare(conf, context, triContext);
outputFactories.put(n, pn.processor.getOutputFactory());
}
stateIndex++;
}
}
for (Node n : nodes):
在此循环中判断节点的statelnfo是否为空,并对存储的State对象进行初始化。初始化过后的State对象被存储于TopologyContext的taskData中,并以statelnfo.id为键。statelnfo.id是一个以串 “ state” 为前缀的在Topology中唯一的字符串。
subgraph:
根据SubTopologyBolt中包含的节点获得一个子图。
it:
对子图进行拓扑排序,TopologicalOrderlterator类型的it变量用来按照拓扑排序的顺序遍历子图。
if (n instanceof ProcessorNode):
SubTopologyBolt只对处理节点进行操作。处理节点中包含了一个TridentProcessor。Spout节点和分区节点则不在SubTopologyBolt的处理范围之内。
pn.processor.prepare(conf, context, triContext:
调用该TridentProcessor的prepare方法,它将新产生的TridentContext对象作为构造函数的一个参数传入。
outputFactories.put(n, pn.processor.getOutputFactory()):
将该TridentProcessor所对应的输出添加到SubTopologyBolt的输出中。于是该输出便可被其他的SubTopologyBolt作为输入了。
statelndex变量:
用于唯一地标识SubTopologyBolt中的每一个节点。
public void execute(BatchInfo batchInfo, Tuple tuple) {
String sourceStream = tuple.getSourceStreamId();
InitialReceiver ir = roots.get(sourceStream);
if (ir == null) {
throw new RuntimeException("Received unexpected tuple " + tuple.toString());
}
ir.receive((ProcessorContext) batchInfo.state, tuple);
}
首先,根据输人消息的流号,在roots中找到对应的InitialReceiver对象,并调用其receive方法。
所有等待该流消息的TridentProcessor的execute方法均会被调用。
在某个TridentProcessor的execute方法中,下游TridentProcessor的execute方法也会被依次调用到,于是构成了一个调用链,直到SubTopologyBolt完成对该消息的处理后结束。
public void finishBatch(BatchInfo batchInfo) {
for (TridentProcessor p : myTopologicallyOrdered.get(batchInfo.batchGroup)) {
p.finishBatch((ProcessorContext) batchInfo.state);
}
}
@Override
public Object initBatchState(String batchGroup, Object batchId) {
ProcessorContext ret = new ProcessorContext(batchId, new Object[nodes.size()]);
for (TridentProcessor p : myTopologicallyOrdered.get(batchGroup)) {
p.startBatch(ret);
}
return ret;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (Node n : nodes) {
declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
}
}
在initBatchState方法中,会对ProcessorContext的数据进行初始化,然后返回ProcessorContext对象。Trident中的聚集器均要基于ProcessorContext中state所存储的数据来实现。
declareOutputFields方法会对SubTopologyBolt中每一个节点的输出进行声明,它将$batchid作为第1列。可以看出SubTopologyBolt虽然作为一个整体而存在,可其中每一个节点的输出均可能成为最终的输出。