storm源码分析(十三)

文章目录


2021SC@SDUSC

TridentTopology

genBoltIds

生成bolt的名称。

private static Map<Group, String> genBoltIds(Collection<Group> groups) {
    Map<Group, String> ret = new HashMap<>();
    int ctr = 0;
    for(Group g: groups) {
        if(!isSpoutGroup(g)) {
            List<String> name = new ArrayList<>();
            name.add("b");
            name.add("" + ctr);
            String groupName = getGroupName(g);
            if(groupName!=null && !groupName.isEmpty()) {
                name.add(getGroupName(g));                
            }
            ret.put(g, Utils.join(name, "-"));
            ctr++;
        }
    }
    return ret;
}

private static String getGroupName(Group g) {
    TreeMap<Integer, String> sortedNames = new TreeMap<>();
    for(Node n: g.nodes) {
        if(n.name!=null) {
            sortedNames.put(n.creationIndex, n.name);
        }
    }
    List<String> names = new ArrayList<>();
    String prevName = null;
    for(String n: sortedNames.values()) {
        if(prevName==null || !n.equals(prevName)) {
            prevName = n;
            names.add(n);
        }
    }
    return Utils.join(names, "-");
}

genBoltIds用于为bolt生成一个唯一的id,它使用字母b开头,然后是一个数字id,接着是group的名,然后是第2个id, 第2个group的名,依次下去。而group的名称是由这个group包含的Node名称组成的。

addSourceNode()

protected Stream addSourcedNode(Stream source, Node newNode) {
    return addSourcedNode(Arrays.asList(source), newNode);
}
protected Stream addSourcedNode(List<Stream> sources, Node newNode) {
    registerSourcedNode(sources, newNode);
    return new Stream(this, newNode.name, newNode);
}

创建一个新节点,指定新节点的父节点(可能多个)。指定多个sources的情况只在merge()方法中被调用multiReduce()时调用。因此这里只关注一个source的情形。

protected void registerSourcedNode(List<Stream> sources, Node newNode) {
    registerNode(newNode);
    int streamIndex = 0;
    for(Stream s: sources) {
        _graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));
        streamIndex++;
    }        
}

除了注册新节点 registerNode(newNode)以外,还在每个stream和节点间创建一条边。

protected void registerNode(Node n) {
    _graph.addVertex(n);
    if(n.stateInfo!=null) {
        String id = n.stateInfo.id;
        if(!_colocate.containsKey(id)) {
            _colocate.put(id, new ArrayList());
        }
        _colocate.get(id).add(n);
    }
}

向图中添加一个节点。如果节点中的stateInfo成员不为空,则将该节点放入与存储序号(StateId)相对应的哈希表_colocate中。_colocate变量将所有访问同一存储的节点关联在一起,并将他们放在一个Bolt中执行。

addNode()

protected Stream addNode(Node n) {
    registerNode(n);
    return new Stream(this, n.name, n);
}

这个方法比较简单,它只在newStream()及newDRPCStream中调用,这是用于提供一个新的数据源的。而上面的addSourceNode()是用于在bolt中添加下一个处理节点的。

TridentTopologyBuilder

setSpout()

    Map<GlobalStreamId, String> _batchIds = new HashMap();
    Map<String, TransactionalSpoutComponent> _spouts = new HashMap();

    public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {
        Map<String, String> batchGroups = new HashMap();
        batchGroups.put(streamName, batchGroup);
        markBatchGroups(id, batchGroups);

        TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
        _spouts.put(id, c);
        return new SpoutDeclarerImpl(c);
    }

    private void markBatchGroups(String component, Map<String, String> batchGroups) {
        for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
            _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
        }
    }

调用了markBatchGroups,将新的component添加到_batchIds中,同时也添加到_spouts中。

setBolt()

    Map<GlobalStreamId, String> _batchIds = new HashMap();
    Map<String, Component> _bolts = new HashMap();

    // map from stream name to batch id
    public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {
        markBatchGroups(id, batchGroups);
        Component c = new Component(bolt, parallelism, committerBatches);
        _bolts.put(id, c);
        return new BoltDeclarerImpl(c);
        
    }

    private void markBatchGroups(String component, Map<String, String> batchGroups) {
        for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
            _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
        }
    }

这里调用了markBatchGroups将新的component添加到_batchIds中,同时也添加到_bolts中;对于trident来说,就是一系列的ProcessorNode(可能也会有PartitionNode)。

buildTopology()

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);

                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }
        
        //......

        Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
        Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
        Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

        for(String batch: batchesToCommitIds.keySet()) {
            List<String> commitIds = batchesToCommitIds.get(batch);
            SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));

            if(onHeap != null) {
                if(offHeap != null) {
                    masterCoord.setMemoryLoad(onHeap, offHeap);
                }
                else {
                    masterCoord.setMemoryLoad(onHeap);
                }
            }

            if(cpuLoad != null) {
                masterCoord.setCPULoad(cpuLoad);
            }
        }
                
        for(String id: _bolts.keySet()) {
            Component c = _bolts.get(id);
            
            Map<String, CoordSpec> specs = new HashMap<>();
            
            for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
                String batch = batchIdsForBolts.get(s);
                if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
                CoordSpec spec = specs.get(batch);
                CoordType ct;
                if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {
                    ct = CoordType.single();
                } else {
                    ct = CoordType.all();
                }
                spec.coords.put(s.get_componentId(), ct);
            }
            
            for(String b: c.committerBatches) {
                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
            
            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
            for(Map<String, Object> conf: c.componentConfs) {
                d.addConfigurations(conf);
            }
            
            for(InputDeclaration inputDecl: c.declarations) {
               inputDecl.declare(d);
            }
            
            Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
            for(Map.Entry<String, Set<String>> entry: batchToComponents.entrySet()) {
                for(String comp: entry.getValue()) {
                    d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey()));
                }
            }
            
            for(String b: c.committerBatches) {
                d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
        }

        return builder.createTopology();
    }

对于bolt来说(包装了ProcessorNode的SubtopologyBolt),这里设置了TridentBoltExecutor这个bolt,它directGrouping了TridentBoltExecutor.COORD_STREAM($coord-)

同时还allGrouping了MasterBatchCoordinator.COMMIT_STREAM_ID($commit)。

上一篇:【Storm】流操作入门编程实战


下一篇:20190401