Storm的一个简单例子

maven依赖

		<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.3</version>
        </dependency>

相关类

  • LogAnalyserStorm.java
  • FakeCallLogReaderSpout.java
  • CallLogCreatorBolt.java
  • CallLogCounterBolt.java

LogAnalyserStorm

package com.storm.demo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class LogAnalyserStorm {

    public static void main(String... args) throws Exception{
        Config config = new Config();
        config.setDebug(true);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

        builder
                .setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
                .shuffleGrouping("call-log-reader-spout");
        builder
                .setBolt("call-log-counter-bolt", new CallLogCounterBolt())
                .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
        Thread.sleep(10000);

        cluster.shutdown();
    }
}

FakeCallLogReaderSpout

package com.storm.demo;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class FakeCallLogReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private boolean completed = false;

    private TopologyContext context;

    private Random randomGenerator = new Random();
    private Integer idx = 0;

    /**
     * 为Spout提供执行环境,执行器将运行此方法来初始化喷头
     * @param map 为此Spout提供storm配置
     * @param topologyContext 提供有关拓扑中的Spout位置,其任务ID,输入和输出信息的完整信息
     * @param spoutOutputCollector 使我们能够发出将由bolts处理的元祖
     */
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.collector = spoutOutputCollector;
    }

    /**
     * 当Spout将要关闭时调用此方法
     */
    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    /**
     * 通过收集器发出生成的数据
     */
    @Override
    public void nextTuple() {

        if (this.idx <= 1000) {
            List<String> mobileNumbers = new ArrayList<>();
            mobileNumbers.add("13805194441");
            mobileNumbers.add("13805194442");
            mobileNumbers.add("13805194443");
            mobileNumbers.add("13805194444");

            Integer localIndex = 0;
            while (localIndex++ < 100 && this.idx++ < 1000) {
                String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

                while (fromMobileNumber == toMobileNumber){
                    toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                }

                Integer duration = randomGenerator.nextInt(60);
                this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
            }
        }
    }

    /**
     * 确认处理了特定元祖
     * @param o
     */
    @Override
    public void ack(Object o) {

    }

    /**
     * 指定不处理和不重新处理特定元祖
     * @param o
     */
    @Override
    public void fail(Object o) {

    }

    /**
     * 声明元祖的输出模式
     * @param outputFieldsDeclarer 用于声明输出流ID,输出字段等
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

CallLogCreatorBolt

package com.storm.demo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class CallLogCreatorBolt implements IRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String from = tuple.getString(0);
        String to = tuple.getString(1);
        Integer duration = tuple.getInteger(2);
        collector.emit(new Values(from + " - " + to, duration));
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call", "duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

CallLogCounterBolt

package com.storm.demo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class CallLogCounterBolt implements IRichBolt {

    Map<String, Integer> counterMap;
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.counterMap = new HashMap<>();
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String call = tuple.getString(0);
        Integer duration = tuple.getInteger(1);

        if (!counterMap.containsKey(call)) {
            counterMap.put(call, 1);
        } else {
            Integer c = counterMap.get(call) + 1;
            counterMap.put(call, c);
        }

        collector.ack(tuple);
    }

    @Override
    public void cleanup() {
        counterMap.entrySet().forEach(f -> {
            System.out.println(f.getKey() + " : " + f.getValue());
        });
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
上一篇:python 并发编程 锁 / 信号量 / 事件 / 队列(进程间通信(IPC)) /生产者消费者模式


下一篇:Java如何成为以后的主流语