storm(3)-本机模式-helloworld

pom.xml

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <!-- 本机模式,需要storm-core,集群模式不需求-->
    <!--<scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.2</version>
    <scope>provided</scope>
</dependency>

...
<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
            <manifest>
                <mainClass></mainClass>
            </manifest>
        </archive>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>

</plugin>
HelloWorldSpout.java
package com.ebc.spout;

import lombok.extern.slf4j.Slf4j;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * @author yaoyuan2
 * @date 2019/4/11
 */
@Slf4j
public class HelloWorldSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    /**
     *  重复调用,包含spout整个逻辑
     * @return void
     */
    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String sendStr = "Hello World";
        collector.emit(new Values(sendStr));
        //log.info(sendStr);
    }
    /**
     * 告诉storm集群,spout发送了那些字段
     * @param declarer
     * @return void
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}
HelloWorldBolt.java
package com.ebc.blot;

import lombok.extern.slf4j.Slf4j;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

/**
 * 读取已产生的Tuple并实现必要的统计
 * @author yaoyuan2
 * @date 2019/4/11
 */
@Slf4j
public class HelloWorldBolt extends BaseRichBolt {
    private int myCount = 0;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    }

    @Override
    public void execute(Tuple input) {
        String test = input.getStringByField("sentence");
        if (test == "Hello World") {
            myCount++;
            log.info("发现了1个Hello World!现在的计数值="+Integer.toString(myCount));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("myCount"));
    }
}
HelloWorldTopology.java
package com.ebc;

import com.ebc.blot.HelloWorldBolt;
import com.ebc.spout.HelloWorldSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

/**
 * @author yaoyuan2
 * @date 2019/4/11
 */
public class HelloWorldTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("helloWorld",new HelloWorldSpout(),2);
        builder.setBolt("HelloWorldBolt",new HelloWorldBolt(),4).shuffleGrouping("helloWorld");
        Config conf = new Config();
        conf.setDebug(true);
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(20);
            StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test",conf,builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration monitorInterval="1" status="ERROR" strict="true" name="LogConfig">
    <Properties>
        <Property name="log.layout">%date{HH:mm:ss.SSS} [%thread] %-5level %logger{20}:%line - %msg%n</Property>
    </Properties>

    <Appenders>
        <Appender type="Console" name="STDOUT">
            <Target>SYSTEM_OUT</Target>
            <Layout type="PatternLayout" pattern="${log.layout}"/>
        </Appender>
    </Appenders>

    <Loggers>
        <Root level="info">
            <AppenderRef ref="STDOUT"/>
        </Root>
        <Logger name="org.apache.storm" level="error" />
    </Loggers>

</Configuration>

 

输出:

13:21:53.792 [Thread-22-HelloWorldBolt-executor[3 3]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=36
13:21:53.855 [Thread-32-HelloWorldBolt-executor[4 4]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=41
13:21:53.892 [Thread-24-HelloWorldBolt-executor[1 1]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=35
13:21:53.955 [Thread-32-HelloWorldBolt-executor[4 4]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=42
13:21:53.992 [Thread-32-HelloWorldBolt-executor[4 4]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=43
13:21:54.055 [Thread-18-HelloWorldBolt-executor[2 2]] INFO  com.ebc.blot.HelloWorldBolt:30 - 发现了1个Hello World!现在的计数值=45
13:21:57.532 [Thread-14] ERROR org.apache.storm.daemon.supervisor.ReadClusterState:182 - Failed to Sync Supervisor
java.lang.RuntimeException: java.lang.InterruptedException

blot 有4个线程执行计算。

将HelloWorldSpout.nextTuple方法中的//log.info(sendStr);放开,将HelloWorldBolt.execute方法中的log.info("发现了1个Hello World!现在的计数值="+Integer.toString(myCount));注释,输出:

13:26:48.969 [Thread-20-helloWorld-executor[7 7]] INFO  com.ebc.spout.HelloWorldSpout:35 - Hello World
13:26:49.056 [Thread-26-helloWorld-executor[6 6]] INFO  com.ebc.spout.HelloWorldSpout:35 - Hello World
13:26:52.512 [Thread-14] ERROR org.apache.storm.daemon.supervisor.ReadClusterState:182 - Failed to Sync Supervisor
java.lang.RuntimeException: java.lang.InterruptedException

spout有2个线程同时向blot发送消息。

抛出的异常是因为:

Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

休眠10秒后,自动关闭storm服务了。

 


上一篇:5、Storm集成Kafka


下一篇:零基础大数据学习路线指南,做个不秃头的大数据工程师!