《Storm企业级应用:实战、运维和调优》——2.4 创建Topology并向集群提交任务

本节书摘来自华章计算机《Storm企业级应用:实战、运维和调优》一书中的第2章,第2.4节,作者:马延辉 陈书美 雷葆华著, 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.4 创建Topology并向集群提交任务

Topology是Storm的核心概念之一,是将Spout与Bolt融合在一起的纽带,在Storm集群中运行,完成实时计算的任务。在Storm集群中,Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务,可以提交由任何语言创建的Topology。下面使用Java语言讲解Topology的使用。首先了解如何创建Topology。
2.4.1 创建Topology
在创建一个Topology之前,设计一个Topology来统计词频。在创建Topology之前,要准备Spout(数据源)和Bolt来组成Topology。这里简单介绍创建的Spout和Bolt,第3章会详细介绍这两个概念。
下面梳理Topology的大致结构。
1.?Spout
创建一个WordSpout数据源,负责发送语句。WordSpout的代码如下:

public class WordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private static f?inal String[] msgs = new String[] {
             "I have a dream",
             "my dream is to be a data analyst",
             "you kan do what you are dreaming",
             "don't give up your dreams",
             "it's just so so ",
             "We need change the traditional ideas and practice boldly",
             "Storm enterprise real time calculation of actual combat",
             "you kan be what you want be",
             };
    private static f?inal Random random = new Random();   
    public void open(Map conf, TopologyContext context,
             SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void nextTuple() {
        String sentence= msgs[random.nextInt(8)];
        collector.emit(new Values(sentence));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
  }

2.?Bolt
两个Bolt,一个负责将语句切分,即SplitSentenceBolt,另一个是对切分的单词进行词频累加的Bolt,即WordCountBolt。下面是这两个Bolt的具体代码。

public class SplitSentenceBolt implements IBasicBolt{ 
      public void prepare(Map conf, TopologyContext context) { 
       } 
      public void execute(Tuple tuple, BasicOutputCollector collector) { 
            String sentence = tuple.getString(0); 
             for(String word: sentence.split(" ")) { 
                     collector.emit(new Values(word)); 
               } 
         } 
      public void cleanup() { 
      } 
      public void declareOutputFields(OutputFieldsDeclarer declarer) { 
            declarer.declare(new Fields("word")); 
          } 
 } 
public class WordCountBolt implements IBasicBolt { 
      private Map<String, Integer> _counts = new HashMap<String, Integer>(); 
      public void prepare(Map conf, TopologyContext context) { 
      } 
      public void execute(Tuple tuple, BasicOutputCollector collector) { 
            String word = tuple.getString(0); 
            int count; 
            if(_counts.containsKey(word)) { 
                count = _counts.get(word); 
            } else { 
                count = 0; 
} 
            count++; 
            _counts.put(word, count); 
            collector.emit(new Values(word, count)); 
      } 
      public void cleanup() { 
      } 
      public void declareOutputFields(OutputFieldsDeclarer declarer) { 
            declarer.declare(new Fields("word", "count")); 
      }
}

3.?Topology
要创建的Topology的Spout从句子队列中随机生成一个句子,Spout用setSpout方法插入一个独特的ID到Topology。必须给予Topology中的每个节点一个ID,ID是由其他Bolt用于订阅该节点的输出流,其中WordSpout在Topology中的ID为1。
setBolt用于在Topology中插入Bolt,在Topology中定义的第一个Bolt是切分句子的SplitSentenceBolt,该Bolt将句子流转成单词流,第二个Bolt统计单词。Topology的代码如下:

TopologyBuilder builder = new TopologyBuilder(); 
bulider.setSpout(1,new WordSpout(),2);
builder.setBolt(2, new SplitSentenceBolt(), 10).shuffleGrouping(1); 
builder.setBolt(3, new WordCountBolt(), 20).f?ieldsGrouping(2, new Fields("word"));

这样就创建了简单的Topology结构,下面介绍如何使用Topology。
2.4.2 向集群提交任务
向Storm集群提交Topology任务,类似提交MapReduce作业到Hadoop集群中,只需要运行JAR包中的Topology即可。而使用kill命令可以杀掉任务,类似杀掉MapReduce作业。下面详细介绍这两部分内容。
1.?启动Topology
在Storm的安装主目录下,执行下面的命令提交任务:

bin/storm jar testTopolgoy.jar org.me.MyTopology arg1 arg2 arg3

其中,jar命令专门负责提交任务,testTopolgoy.jar是包含Topology实现代码的JAR包,org.me.MyTopology的main方法是Topology的入口,arg1、arg2和arg3为org.me.MyTopology执行时需要传入的参数。
2.?停止Topology
在Storm主目录下,执行kill命令停止之前已经提交的Topology:

bin/Storm kill {toponame}

其中,{toponame}为Topology提交到Storm集群时指定的Topology任务名称,该名称可以在代码中指定,也可以作为参数传入Topology中。

上一篇:【Debian】Postfix+Dovecot+sasl 实现SMTPS+IMAPS


下一篇:带你读《区块链开发实战: 基于JavaScript的公链与DApp开发》之三:Asch——区块链应用开发平台