Topology
package com.zxf.strom; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; /** * strom */ public class MyTopology { public static void main(String[] args) { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("myspout",new MySpout(),2); topologyBuilder.setBolt("mybolt",new MyBolt(),2).shuffleGrouping("myspout"); try { if(args.length==0) { //这个是本地运行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("submitTopology",new Config(),topologyBuilder.createTopology()); }else { Config config = new Config(); config.setNumWorkers(2); StormSubmitter.submitTopology("wordcount1", config, topologyBuilder.createTopology()); } }catch (Exception e){ System.out.println(e.getStackTrace()); } } }
Spout
package com.zxf.strom; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; /** * */ public class MySpout extends BaseRichSpout { private SpoutOutputCollector collector; private Long num = 0l; /** * 这个是strom 初始化 接口 * @param map * @param topologyContext * @param spoutOutputCollector */ public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { collector = spoutOutputCollector; System.out.println("RichSpout open 初始化了 "); } public void nextTuple() { num = num+1; collector.emit(new Values(num)); System.out.println("RichSpout nextTuple 被调用了 "); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("num")); } }
Bolt
package com.zxf.strom; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * */ public class MyBolt extends BaseBasicBolt { private long count = 0 ; /** * BaseBasicBolt * @param topoConf * @param context */ public void prepare(Map<String, Object> topoConf, TopologyContext context) { System.out.println("BaseBasicBolt 被初始化了"); } public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { Long num = tuple.getLongByField("num"); count = count+ num; System.out.println("BaseBasicBolt execute"+count); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }