1.导入依赖
<!--https://mvnrepository.com/artifact/org.apache.storm/storm-core-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.2.0</version>
</dependency>
2.创建 spout 继承 BaseRichSpout
public class WordSpout extends BaseRichSpout {
//模拟数据来源
String[] init_data = {"hello java", "hello python", "hello C++", "hello scala"};
/**
* 放射方法在里面,应该在nextTuple中调用,可以把他提出来在初始化中赋值
*/
private SpoutOutputCollector collector;
/**
* 初始化方法,只执行一次
* @param map
* @param topologyContext
* @param spoutOutputCollector
*/
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
/**
* 死循环,storm 内部一直在调用
* 数据来源 kafka flume ...
*/
public void nextTuple() {
//拿数据
String init_datum = init_data[new Random().nextInt(init_data.length)];
//拆分
String[] split = init_datum.split(" ");
//循环发射到 bolt 中
for (String str:split){
// List list = Arrays.asList(str);
// collector.emit(list);
//第二种
collector.emit(new Values(str));
}
}
/**
* 定义发射出去,tuple 的字段名
* @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
3.创建求和 bolt 继承 BaseRichBolt
public class WordBolt extends BaseRichBolt {
/**
* 临时解决方案 结果集
*/
private Map<String, Long> resMap;
private OutputCollector collector;
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
resMap = new HashMap<String, Long>();
collector = outputCollector;
}
public void execute(Tuple tuple) {
//根据字段名拿到每一个 tuple
String word = tuple.getStringByField("word");
//给每一个单词次数累加
Long time = resMap.get(word);
if (time != null){
resMap.put(word, time + 1L);
}else {
resMap.put(word, 1L);
}
//发射
collector.emit(new Values(resMap));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("resMap"));
}
}
4.创建输出 bolt
public class PrintBolt extends BaseRichBolt {
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
public void execute(Tuple tuple) {
//取值
Map<String, Long> resMap = (Map<String, Long>)tuple.getValueByField("resMap");
//处理
for (String key:resMap.keySet()){
System.out.println(key + " ==> " + resMap.get(key));
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
5.创建 topology
public class WordTopoigy {
public static void main(String[] args) throws Exception {
//调用主的 Api
TopologyBuilder builder = new TopologyBuilder();
//关联 spout bolt
builder.setSpout("spout01", new WordSpout());
builder.setBolt("count01", new WordBolt())
//关联线(放射方向)
.shuffleGrouping("spout01");
builder.setBolt("print01", new PrintBolt()).shuffleGrouping("count01");
//本地发布 开发时用
LocalCluster cluster = new LocalCluster();
LocalCluster.LocalTopology topology01 = cluster.submitTopology("topology01",
new HashMap<String, Object>(), builder.createTopology());
}
}