pom.xml
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> <!-- 本机模式,需要storm-core,集群模式不需求--> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <scope>provided</scope> </dependency> ... <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
HelloWorldSpout.java
package com.ebc.spout; import lombok.extern.slf4j.Slf4j; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; /** * @author yaoyuan2 * @date 2019/4/11 */ @Slf4j public class HelloWorldSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 重复调用,包含spout整个逻辑 * @return void */ @Override public void nextTuple() { Utils.sleep(100); String sendStr = "Hello World"; collector.emit(new Values(sendStr)); //log.info(sendStr); } /** * 告诉storm集群,spout发送了那些字段 * @param declarer * @return void */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
HelloWorldBolt.java
package com.ebc.blot; import lombok.extern.slf4j.Slf4j; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * 读取已产生的Tuple并实现必要的统计 * @author yaoyuan2 * @date 2019/4/11 */ @Slf4j public class HelloWorldBolt extends BaseRichBolt { private int myCount = 0; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { String test = input.getStringByField("sentence"); if (test == "Hello World") { myCount++; log.info("发现了1个Hello World!现在的计数值="+Integer.toString(myCount)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("myCount")); } }
HelloWorldTopology.java
package com.ebc; import com.ebc.blot.HelloWorldBolt; import com.ebc.spout.HelloWorldSpout; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; /** * @author yaoyuan2 * @date 2019/4/11 */ public class HelloWorldTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("helloWorld",new HelloWorldSpout(),2); builder.setBolt("HelloWorldBolt",new HelloWorldBolt(),4).shuffleGrouping("helloWorld"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(20); StormSubmitter.submitTopology(args[0],conf,builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test",conf,builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } }
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <Configuration monitorInterval="1" status="ERROR" strict="true" name="LogConfig"> <Properties> <Property name="log.layout">%date{HH:mm:ss.SSS} [%thread] %-5level %logger{20}:%line - %msg%n</Property> </Properties> <Appenders> <Appender type="Console" name="STDOUT"> <Target>SYSTEM_OUT</Target> <Layout type="PatternLayout" pattern="${log.layout}"/> </Appender> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="STDOUT"/> </Root> <Logger name="org.apache.storm" level="error" /> </Loggers> </Configuration>
输出:
13:21:53.792 [Thread-22-HelloWorldBolt-executor[3 3]] INFO com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=36 13:21:53.855 [Thread-32-HelloWorldBolt-executor[4 4]] INFO com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=41 13:21:53.892 [Thread-24-HelloWorldBolt-executor[1 1]] INFO com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=35 13:21:53.955 [Thread-32-HelloWorldBolt-executor[4 4]] INFO com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=42 13:21:53.992 [Thread-32-HelloWorldBolt-executor[4 4]] INFO com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=43 13:21:54.055 [Thread-18-HelloWorldBolt-executor[2 2]] INFO com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=45 13:21:57.532 [Thread-14] ERROR org.apache.storm.daemon.supervisor.ReadClusterState:182 - Failed to Sync Supervisor java.lang.RuntimeException: java.lang.InterruptedException
blot 有4个线程执行计算。
将HelloWorldSpout.nextTuple方法中的//log.info(sendStr);放开,将HelloWorldBolt.execute方法中的log.info("发现了1个Hello World!现在的计数值="+Integer.toString(myCount));注释,输出:
13:26:48.969 [Thread-20-helloWorld-executor[7 7]] INFO com.ebc.spout.HelloWorldSpout:35 - Hello World 13:26:49.056 [Thread-26-helloWorld-executor[6 6]] INFO com.ebc.spout.HelloWorldSpout:35 - Hello World 13:26:52.512 [Thread-14] ERROR org.apache.storm.daemon.supervisor.ReadClusterState:182 - Failed to Sync Supervisor java.lang.RuntimeException: java.lang.InterruptedException
spout有2个线程同时向blot发送消息。
抛出的异常是因为:
Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();
休眠10秒后,自动关闭storm服务了。