Java操作storm入门

1.导入依赖

<!--https://mvnrepository.com/artifact/org.apache.storm/storm-core-->

<dependency>

        <groupId>org.apache.storm</groupId>

        <artifactId>storm-core</artifactId>

        <version>2.2.0</version>

</dependency>

2.创建 spout 继承  BaseRichSpout

 

public class WordSpout extends BaseRichSpout {

 

    //模拟数据来源

    String[] init_data = {"hello java", "hello python", "hello C++", "hello scala"};

 

    /**

     * 放射方法在里面,应该在nextTuple中调用,可以把他提出来在初始化中赋值

     */

    private SpoutOutputCollector collector;

 

    /**

     * 初始化方法,只执行一次

     * @param map

     * @param topologyContext

     * @param spoutOutputCollector

     */

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {

        collector = spoutOutputCollector;

    }

 

    /**

     * 死循环,storm 内部一直在调用

     * 数据来源 kafka flume ...

     */

    public void nextTuple() {

        //拿数据

        String init_datum = init_data[new Random().nextInt(init_data.length)];

        //拆分

        String[] split = init_datum.split(" ");

        //循环发射到 bolt

        for (String str:split){

//            List list = Arrays.asList(str);

//            collector.emit(list);

            //第二种

            collector.emit(new Values(str));

        }

    }

 

    /**

     * 定义发射出去,tuple 的字段名

     * @param outputFieldsDeclarer

     */

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("word"));

    }

}

 

3.创建求和 bolt 继承 BaseRichBolt

 

public class WordBolt extends BaseRichBolt {

 

    /**

     * 临时解决方案 结果集

     */

    private Map<String, Long> resMap;

 

    private OutputCollector collector;

 

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

        resMap = new HashMap<String, Long>();

        collector = outputCollector;

    }

 

    public void execute(Tuple tuple) {

        //根据字段名拿到每一个 tuple

        String word = tuple.getStringByField("word");

        //给每一个单词次数累加

        Long time = resMap.get(word);

        if (time != null){

            resMap.put(word, time + 1L);

        }else {

            resMap.put(word, 1L);

        }

        //发射

        collector.emit(new Values(resMap));

    }

 

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("resMap"));

    }

}

 

4.创建输出 bolt

 

public class PrintBolt extends BaseRichBolt {

 

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

 

    }

 

    public void execute(Tuple tuple) {

        //取值

        Map<String, Long> resMap = (Map<String, Long>)tuple.getValueByField("resMap");

        //处理

        for (String key:resMap.keySet()){

            System.out.println(key + " ==> " + resMap.get(key));

        }

    }

 

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

 

    }

}

 

5.创建 topology

 

public class WordTopoigy {

 

    public static void main(String[] args) throws Exception {

        //调用主的 Api

        TopologyBuilder builder = new TopologyBuilder();

        //关联 spout bolt

        builder.setSpout("spout01", new WordSpout());

        builder.setBolt("count01", new WordBolt())

                //关联线(放射方向)

                .shuffleGrouping("spout01");

        builder.setBolt("print01", new PrintBolt()).shuffleGrouping("count01");

        //本地发布 开发时用

        LocalCluster cluster = new LocalCluster();

        LocalCluster.LocalTopology topology01 = cluster.submitTopology("topology01",

                new HashMap<String, Object>(), builder.createTopology());

    }

}

 

 

上一篇:Java如何成为以后的主流语


下一篇:python多线程比单线程效率低的原因及其解决办法