-
需求
实时统计发射到Storm框架中单词的总数
-
分析
设计一个topology,来实现对文档里面的单词出现的频率进行统计,整个topology分为三个部分
(1)WordCountSpot:数据源,在已知的英文句子中,随机发送一条句子出去
package storm.wordcount; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; //发送一条语句 public class WordCountSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { //发送数据 collector.emit(new Values("shnad zhang1 zhsndga1 dasd a a b b c dd d dd")); //延时0.5 s try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("love")); } }
(2)WordCountSplitBolt:负责将单行文本记录(句子),切分成单词
package storm.wordcount; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; public class WordCountSplitBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override //接受数据 public void execute(Tuple input) { //1. 获取数据 String line = input.getString(0); //2 截取数据 String[] splits = line.split(" "); //3 发送出去 for (String word : splits) { collector.emit(new Values(word,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //声明字段 declarer.declare(new Fields("word", "num")); } }
(3)WordCountBolt:负责对单词的频率进行累加
package storm.wordcount; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; public class WordCountBolt extends BaseRichBolt { //单词为key,单词出现的次数为value private Map<String, Integer> map = new HashMap<>(); private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { //1 获取传递过来的数据 String word = input.getString(0); Integer num = input.getInteger(1); //2 业务逻辑 if (map.containsKey(word)) { //如果之前统计过有单词的个数,获取个数 Integer count = map.get(word); map.put(word, count + num); } else { map.put(word, num); } // 3 控制台打印 System.err.println(Thread.currentThread().getId() + " word : " + word + " num: " + map.get(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
(4)WordCountMain驱动
1.创建拓扑对象 2.设置spout 3.配置worker开启个数 4. 提交
package storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountMain { public static void main (String[] args){ //1 创建拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("WordCountSpout",new WordCountSpout(),1); builder.setBolt("WordCountSplitBolt",new WordCountSplitBolt(),2).shuffleGrouping("WordCountSpout"); builder.setBolt("WordCountBolt",new WordCountBolt(),4).fieldsGrouping("WordCountSplitBolt",new Fields("word")); //2 创建配置信息 Config conf = new Config(); conf.setNumWorkers(2); //3 提交 if (args.length > 0){ try { StormSubmitter.submitTopology(args[0],conf,builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordtopology",conf,builder.createTopology()); } } }