Storm实现单词统计案例

  1. 需求

    实时统计发射到Storm框架中单词的总数

  2. 分析

    设计一个topology,来实现对文档里面的单词出现的频率进行统计,整个topology分为三个部分

    (1)WordCountSpot:数据源,在已知的英文句子中,随机发送一条句子出去

    package storm.wordcount;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    //发送一条语句
    public class WordCountSpout extends BaseRichSpout {
    
        private SpoutOutputCollector collector;
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    
            this.collector = collector;
    
        }
    
        @Override
        public void nextTuple() {
            //发送数据
            collector.emit(new Values("shnad zhang1 zhsndga1 dasd a a b b c dd d dd"));
    
            //延时0.5 s
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            declarer.declare(new Fields("love"));
        }
    }
    

    (2)WordCountSplitBolt:负责将单行文本记录(句子),切分成单词

    package storm.wordcount;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    public class WordCountSplitBolt extends BaseRichBolt{
        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            this.collector = collector;
        }
    
        @Override
        //接受数据
        public void execute(Tuple input) {
    
            //1. 获取数据
            String line = input.getString(0);
    
            //2 截取数据
            String[] splits = line.split(" ");
    
            //3 发送出去
            for (String word : splits) {
                collector.emit(new Values(word,1));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //声明字段
            declarer.declare(new Fields("word", "num"));
        }
    }
    

    (3)WordCountBolt:负责对单词的频率进行累加

    package storm.wordcount;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class WordCountBolt extends BaseRichBolt {
        //单词为key,单词出现的次数为value
        private Map<String, Integer> map = new HashMap<>();
        private OutputCollector collector;
    
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(Tuple input) {
            //1 获取传递过来的数据
            String word = input.getString(0);
            Integer num = input.getInteger(1);
    
            //2 业务逻辑
            if (map.containsKey(word)) {
                //如果之前统计过有单词的个数,获取个数
                Integer count = map.get(word);
    
                map.put(word, count + num);
            } else {
                map.put(word, num);
            }
    
            // 3 控制台打印
            System.err.println(Thread.currentThread().getId() + " word : " + word + " num: " + map.get(word));
    
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }
    

    (4)WordCountMain驱动

    ​ 1.创建拓扑对象 2.设置spout 3.配置worker开启个数 4. 提交

    package storm.wordcount;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class WordCountMain  {
        public static void main (String[] args){
            //1 创建拓扑
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("WordCountSpout",new WordCountSpout(),1);
            builder.setBolt("WordCountSplitBolt",new WordCountSplitBolt(),2).shuffleGrouping("WordCountSpout");
    
    
            builder.setBolt("WordCountBolt",new WordCountBolt(),4).fieldsGrouping("WordCountSplitBolt",new Fields("word"));
    
            //2 创建配置信息
            Config conf = new Config();
            conf.setNumWorkers(2);
    
            //3 提交
            if (args.length > 0){
                try {
                    StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordtopology",conf,builder.createTopology());
            }
        }
    }
    
上一篇:架构解密从分布式到微服务:深入Kubernetes微服务平台


下一篇:Storm分组策略