maven依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.3</version>
</dependency>
相关类
- LogAnalyserStorm.java
- FakeCallLogReaderSpout.java
- CallLogCreatorBolt.java
- CallLogCounterBolt.java
LogAnalyserStorm
package com.storm.demo;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class LogAnalyserStorm {
public static void main(String... args) throws Exception{
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder
.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder
.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
FakeCallLogReaderSpout
package com.storm.demo;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class FakeCallLogReaderSpout implements IRichSpout {
private SpoutOutputCollector collector;
private boolean completed = false;
private TopologyContext context;
private Random randomGenerator = new Random();
private Integer idx = 0;
/**
* 为Spout提供执行环境,执行器将运行此方法来初始化喷头
* @param map 为此Spout提供storm配置
* @param topologyContext 提供有关拓扑中的Spout位置,其任务ID,输入和输出信息的完整信息
* @param spoutOutputCollector 使我们能够发出将由bolts处理的元祖
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.context = topologyContext;
this.collector = spoutOutputCollector;
}
/**
* 当Spout将要关闭时调用此方法
*/
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
/**
* 通过收集器发出生成的数据
*/
@Override
public void nextTuple() {
if (this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<>();
mobileNumbers.add("13805194441");
mobileNumbers.add("13805194442");
mobileNumbers.add("13805194443");
mobileNumbers.add("13805194444");
Integer localIndex = 0;
while (localIndex++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while (fromMobileNumber == toMobileNumber){
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
/**
* 确认处理了特定元祖
* @param o
*/
@Override
public void ack(Object o) {
}
/**
* 指定不处理和不重新处理特定元祖
* @param o
*/
@Override
public void fail(Object o) {
}
/**
* 声明元祖的输出模式
* @param outputFieldsDeclarer 用于声明输出流ID,输出字段等
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CallLogCreatorBolt
package com.storm.demo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class CallLogCreatorBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CallLogCounterBolt
package com.storm.demo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.counterMap = new HashMap<>();
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if (!counterMap.containsKey(call)) {
counterMap.put(call, 1);
} else {
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
counterMap.entrySet().forEach(f -> {
System.out.println(f.getKey() + " : " + f.getValue());
});
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}