用实例的方式去理解storm的并行度

什么是storm的并发度

一个topology(拓扑)在storm集群上最总是以executor和task的形式运行在suppervisor管理的worker节点上。而worker进程都是运行在jvm虚拟机上面的,每个拓扑都会被拆开多个组件分布式的运行在worker节点上。
1.worker
2.executor
3.task
这三个简单关系图:
用实例的方式去理解storm的并行度

一个worker工作进程运行一个拓扑的子集(其实就是拓扑的组件),每个组件的都会以executor(线程)在worker进程上执行,一个worker进程可以同时运行多个拓扑的组件也就是线程。

一个executor线程可以运行同一个组件的一个或者多个tasks

task是实际处理数据的执行者,每一个spout或者bolt会在集群上执行很多个task。在拓扑的生命周期内拓扑结构相同的拓扑的组件任务task数量总是相同的。但是每个组件的执行的线程(executor)数是可以变化的。这就意味着以下条件总是成立的:#threads ≤ #tasks 也就是task的数量总是大于线程数,一般情况下,任务task的数量往往设置成和线程(executor)的数量一致,这样,每个线程执行一个task。

在storm拓扑的并发度其实就是集群上拓扑组件在集群上运行的executor(线程)的数量。

如何设置拓扑的并发度

“并行度”如何配置?其实不仅仅是设置executor线程的数量,同时也要从worker工作进程和task任务的数量的方面考虑。
可以用以下几种方式配置并发度:
1.通过storm的配置文件配置。storm配置文件的加载优先级是:defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration.
工作进程数
描述:为群集中的计算机上的拓扑创建多少个工作进程。
配置选项:TOPOLOGY_WORKERS
如何设置代码(示例):
配置#setNumWorkers
执行者数(线程数)
描述:每个组件生成多少个执行程序。
配置选项:无(将parallelism_hint参数传递给setSpout或setBolt)
如何设置代码(示例):
TopologyBuilder#setSpout()
TopologyBuilder#setBolt()
请注意,从Storm 0.8开始,parallelism_hint参数现在指定该螺栓的执行者的初始数量(不是任务!)。
任务数量
描述:每个组件创建多少个任务。
配置选项:TOPOLOGY_TASKS
如何设置代码(示例):
ComponentConfigurationDeclarer#setNumTasks()
以下是在实践中显示这些设置的示例代码段:

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout");

在上面的代码中,我们配置了Storm来运行GreenBolt带有初始数量为两个执行器和四个相关任务的bolt 。Storm将为每个执行程序(线程)运行两个任务。如果您没有明确配置任务数,Storm将默认运行每个执行程序一个任务。

官方例子

下图显示了简单拓扑在操作中的外观。拓扑结构由三个部分组成:一个叫做spout BlueSpout,两个叫做GreenBolt和YellowBolt。组件被链接,以便BlueSpout将其输出发送到GreenBolt,然后将其自己的输出发送到YellowBolt。
用实例的方式去理解storm的并行度
在GreenBolt被配置为每代码段以上而BlueSpout和YellowBolt仅设置并行提示(执行人数)。这是相关代码:

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );

当然,Storm附带了额外的配置设置来控制拓扑的并行性,包括:

TOPOLOGY_MAX_TASK_PARALLELISM:此设置为可以为单个组件生成的执行程序数量设置上限。它通常在测试期间用于限制在本地模式下运行拓扑时产生的线程数。您可以通过例如Config#setMaxTaskParallelism()设置此选项。

从实际运行的拓扑的角度理解storm的并发度

自己写一个拓扑

实现一个可以设置worker数量,设置spout 、bolt 的Parallelism Hint的拓扑然后打包上传到storm集群运行。

package com.sonly.storm.demo1;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <b>package:com.sonly.storm.demo1</b>
 * <b>project(项目):stormstudy</b>
 * <b>class(类)HelloToplogy</b>
 * <b>creat date(创建时间):2019-05-09 21:55</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(备注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class HelloToplogy {
    public static final Logger LOGGER = LoggerFactory.getLogger(HelloToplogy.class);
    //Topology Name
    //component prefix
    //workers
    //spout executor (parallelism_hint)
    //spout task size
    //bolt executor (parallelism_hint)
    //bolt task size
    public static void main(String[] args) throws InterruptedException {
        TopologyBuilder builder = new TopologyBuilder();
        Config conf = new Config();
        conf.setDebug(true);
        if (args==null || args.length < 7) {
            conf.setNumWorkers(3);
            builder.setSpout("spout", new HellowordSpout(), 4).setNumTasks(4);

            builder.setBolt("split-bolt", new SplitBolt(),  4).shuffleGrouping("spout").setNumTasks(8);
            builder.setBolt("count-bolt", new HellowordBolt(), 8).fieldsGrouping("split-bolt", new Fields("word")).setNumTasks(8);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

            Thread.sleep(10000);
            cluster.killTopology("word-count");
            cluster.shutdown();
        }
        else {
            Options options = Options.builder(args);
            conf.setNumWorkers(options.getWorkers());
            builder.setSpout(options.getPrefix()+"-spout", new HellowordSpout(), options.getSpoutParallelismHint()).setNumTasks(options.getSpoutTaskSize());

            builder.setBolt(options.getPrefix()+"-split-bolt", new SplitBolt(),  options.getBoltParallelismHint()).shuffleGrouping(options.getPrefix()+"-spout").setNumTasks(options.getBoltTaskSize());
            builder.setBolt(options.getPrefix()+"-count-bolt", new HellowordBolt(), options.getBoltParallelismHint()).fieldsGrouping(options.getPrefix()+"-split-bolt", new Fields("word")).setNumTasks(options.getBoltTaskSize());
            try {
                StormSubmitter.submitTopologyWithProgressBar(options.getTopologyName(), conf, builder.createTopology());
                LOGGER.warn("===========================================================");
                LOGGER.warn("The Topology {} is Submited ",options.getTopologyName());
                LOGGER.warn("===========================================================");
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }

        }
    }
    public static class Options{
        private String topologyName;
        private String prefix;
        private Integer workers;
        private Integer spoutParallelismHint;
        private Integer spoutTaskSize;
        private Integer boltParallelismHint;
        private Integer boltTaskSize;

        public Options(String topologyName, String prefix, Integer workers, Integer spoutParallelismHint, Integer spoutTaskSize, Integer boltParallelismHint, Integer boltTaskSize) {
            this.topologyName = topologyName;
            this.prefix = prefix;
            this.workers = workers;
            this.spoutParallelismHint = spoutParallelismHint;
            this.spoutTaskSize = spoutTaskSize;
            this.boltParallelismHint = boltParallelismHint;
            this.boltTaskSize = boltTaskSize;
        }
        public static Options builder(String[] args){
            return new Options(args[0],args[1],Integer.parseInt(args[2])
            ,Integer.parseInt(args[3]),Integer.parseInt(args[4]),Integer.parseInt(args[5]),Integer.parseInt(args[6])
            );
        }
        public String getTopologyName() {
            return topologyName;
        }

        public void setTopologyName(String topologyName) {
            this.topologyName = topologyName;
        }

        public String getPrefix() {
            return prefix;
        }

        public void setPrefix(String prefix) {
            this.prefix = prefix;
        }

        public Integer getWorkers() {
            return workers;
        }

        public void setWorkers(Integer workers) {
            this.workers = workers;
        }

        public Integer getSpoutParallelismHint() {
            return spoutParallelismHint;
        }

        public void setSpoutParallelismHint(Integer spoutParallelismHint) {
            this.spoutParallelismHint = spoutParallelismHint;
        }

        public Integer getSpoutTaskSize() {
            return spoutTaskSize;
        }

        public void setSpoutTaskSize(Integer spoutTaskSize) {
            this.spoutTaskSize = spoutTaskSize;
        }

        public Integer getBoltParallelismHint() {
            return boltParallelismHint;
        }

        public void setBoltParallelismHint(Integer boltParallelismHint) {
            this.boltParallelismHint = boltParallelismHint;
        }

        public Integer getBoltTaskSize() {
            return boltTaskSize;
        }

        public void setBoltTaskSize(Integer boltTaskSize) {
            this.boltTaskSize = boltTaskSize;
        }
    }
}

spout 类:

package com.sonly.storm.demo1;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Currency;
import java.util.Map;
import java.util.Random;

/**
 * <b>package:com.sonly.storm.demo1</b>
 * <b>project(项目):stormstudy</b>
 * <b>class(类)${HellowordSpout}</b>
 * <b>creat date(创建时间):2019-05-09 20:27</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(备注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class HellowordSpout extends BaseRichSpout {
    public static final Logger LOGGER = LoggerFactory.getLogger(HellowordSpout.class);
    //拓扑上下文
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private Map config;
    private Random random;
    public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.config = conf;
        this.context = topologyContext;
        this.collector = collector;
        this.random = new Random();
        LOGGER.warn("HellowordSpout->open:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
    }

    public void nextTuple() {
        String[] sentences = new String[]{"hello world !", "hello Storm !",
                "hello apache flink !", "hello apache kafka stream !", "hello apache spark !"};
        final String sentence = sentences[random.nextInt(sentences.length)];
        collector.emit(new Values(sentence));
        LOGGER.warn("HellowordSpout->nextTuple:hashcode:{}->ThreadId:{},TaskId:{},Values:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId(),sentence);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    @Override
    public void close() {
        LOGGER.warn("HellowordSpout->close:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
        super.close();
    }
}

实现两个bolt一个用来统计单词出现个数,一个用来拆分语句。

package com.sonly.storm.demo1;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * <b>package:com.sonly.storm.demo1</b>
 * <b>project(项目):stormstudy</b>
 * <b>class(类)${CLASS_NAME}</b>
 * <b>creat date(创建时间):2019-05-09 21:19</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(备注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class HellowordBolt extends BaseRichBolt {
    public static final Logger LOGGER = LoggerFactory.getLogger(HellowordBolt.class);
    private TopologyContext context;
    private Map conf;
    private OutputCollector collector;
    private Map<String,Integer> counts = new HashMap(16);
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.conf=map;
        this.context = topologyContext;
        this.collector = outputCollector;
        LOGGER.warn("HellowordBolt->prepare:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
    }

    public void execute(Tuple tuple) {
        LOGGER.warn("HellowordBolt->execute:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
package com.sonly.storm.demo1;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * <b>package:com.sonly.storm.demo1</b>
 * <b>project(项目):stormstudy</b>
 * <b>class(类)${CLASS_NAME}</b>
 * <b>creat date(创建时间):2019-05-09 21:29</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(备注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class SplitBolt extends BaseRichBolt {
    public static final Logger LOGGER = LoggerFactory.getLogger(SplitBolt.class);
    private TopologyContext context;
    private Map conf;
    private OutputCollector collector;
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.conf=map;
        this.context = topologyContext;
        this.collector = outputCollector;
        LOGGER.warn("SplitBolt->prepare:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
    }

    public void execute(Tuple tuple) {
        String words = tuple.getStringByField("sentence");
        String[] contents = words.split(" +");
        for (String content : contents) {
            collector.emit(new Values(content));
            collector.ack(tuple);
        }
        LOGGER.warn("SplitBolt->execute:hashcode:{}->ThreadId:{},TaskId:{}",this.hashCode(),Thread.currentThread().getId(),context.getThisTaskId());
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

local模式启动运行
用实例的方式去理解storm的并行度

在pom文件中添加打包插件

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
            <manifest>
                <mainClass>com.sonly.storm.demo1.HelloToplogy</mainClass>
            </manifest>
        </archive>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>

同时修改dependency 的scope为provide

<scope>provide</scope>

原因是服务器上storm相关包都已经存在了,防止重复打包导致冲突。

//Topology Name
//component prefix
//workers
//spout executor (parallelism_hint)
//spout task size
//bolt executor (parallelism_hint)
//bolt task size

在storm集群提交拓扑

修改日志级别

修改worker的工作进程的日志级别,修改成只输出warn日志,避免其他日志对我的干扰。进入${your_storm_path}/log4j2/目录修改worker.xml文件。先把worker.xml备份把Info级别改成warn

$ cp worker.xml worker.xml.bak

修改成:

<loggers>
    <root level="warn"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
    </root>
    <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    </Logger>
    <Logger name="STDERR" level="INFO">
        <appender-ref ref="STDERR"/>
        <appender-ref ref="syslog"/>
    </Logger>
    <Logger name="STDOUT" level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="syslog"/>
    </Logger>
</loggers>

同步到另外两台supervisor的工作节点服务器。
为了跟清晰的理解并发度,我会通过这个demo 拓扑,修改参数观察stormUI的exector数量和tasks数量。

参数说明

// topologyName='count' ## Topology Name 拓扑的名字
// prefix='tp1' ## component prefix 即为每个spout,bolt的前缀名称
// workers=1  ## worker number 即为工作进程jvm数量
// spoutParallelismHint=2  ## spout executor (parallelism_hint) 即spout的线程数量
// spoutTaskSize=1 ## spout task size 即spout的运行实例数
// boltParallelismHint=2  ## bolt executor (parallelism_hint) 即bolt的线程数量
// boltTaskSize=1  ##bolt task size 即bolt的运行实例数

根据样例分析理解storm的并发度

例子1

执行:

storm jar storm-demo1.jar com.sonly.storm.demo1.HelloToplogy tp1 tp1 1 2 0 2 0

参数详情:

{topologyName='tp1', prefix='tp1', workers=1, spoutParallelismHint=2, spoutTaskSize=0, boltParallelismHint=2, boltTaskSize=0}

这时候task都被设置成0了。如下图:excutors为1,task为1。
用实例的方式去理解storm的并行度
接着往下看:此时我们的bolt的task都被设置0了,所以我们是没有创建spout,bolt的,但是你会发现一个_acker的bolt,这是storm的acker机制,storm自己给我们创建的bolt,并且每一个worker都会必须有一个_acker的bolt,如果我们没有取消ack机制的话。所以worker上只用了一个excutor来跑这个_acker的bolt。
用实例的方式去理解storm的并行度

例子2

storm jar storm-demo1.jar com.sonly.storm.demo1.HelloToplogy tp2 tp2 1 2 1 2 1

参数详情:

{topologyName='tp2', prefix='tp2', workers=1, spoutParallelismHint=2, spoutTaskSize=1, boltParallelismHint=2, boltTaskSize=1}

此时 task的值都被设置成1了。如下图:excutors为4,task为4。
用实例的方式去理解storm的并行度
接着看一下spout,bolt 以及组件的分布情况见下图:
用实例的方式去理解storm的并行度
此时已经我们的有tp2-spout 一个spout,除了系统的acker 还有我们自己创建的两个bolt。因为只有一个worker所以全部分布在一个worker里面。
尽管我们设置了spout的线程数为2,bolt的线程数为2,但是task都被设置成1,只有一个任务需要被两个excutor执行,所以有一个线程实际上是没有任务执行的。所以线程数,就是这几个task的值的和,
一个spout,两个自己的创建的bolt以及acker的task数量的和。

例子3

storm jar storm-demo1.jar com.sonly.storm.demo1.HelloToplogy tp3 tp3 2 2 1 2 1

参数详情:

{topologyName='tp3', prefix='tp3', workers=2, spoutParallelismHint=2, spoutTaskSize=1, boltParallelismHint=2, boltTaskSize=1}

此时worker已经被设置成2了,如下图:executor为5,task为5.
用实例的方式去理解storm的并行度
接着看一下spout,bolt以及组件的分布情况如下图:
用实例的方式去理解storm的并行度

此时,task任务数依然是1,spout和bolt都是1份,acker每个worker都必须有一份的,所以,executor的数就是task实例数也就是:一个spout 两个系统acker bolt,和两个我们自己的bolt。也就是5.这个5个task不均匀的分配到了两个worker进程上。

例子4

storm jar storm-demo1.jar com.sonly.storm.demo1.HelloToplogy tp4 tp4 2 2 2 2 2
    

参数详情:

{topologyName='tp4', prefix='tp4', workers=2, spoutParallelismHint=2, spoutTaskSize=2, boltParallelismHint=2, boltTaskSize=2}

此时参数已经taks 数量被设置成2了,如下图:executor为8,task为8.
用实例的方式去理解storm的并行度
再看一下spout,bolt的分布情况:如下图:
用实例的方式去理解storm的并行度
此时,我们task都被设置成了2,那spout实例和bolt的实例都是2,也就是2+2+2=6 这个是我们自己的创建的task,再加上acker两个task,所以task参数就是8.而这里设置时executor也是8个 被均分到两个worker上面。

例子5

storm jar storm-demo1.jar com.sonly.storm.demo1.HelloToplogy tp5 tp5 2 2 4 2 4

参数详情:

{topologyName='tp5', prefix='tp5', workers=2, spoutParallelismHint=2, spoutTaskSize=4, boltParallelismHint=2, boltTaskSize=4}

此时task设置成4,excutor设置成2,那这样的话,一个excutor会跑两个task ,executor=8,task=14 如下图:
用实例的方式去理解storm的并行度
继续看一下spout和bolt的分布情况:
用实例的方式去理解storm的并行度
此时task设置成4,executor是2,那就是bolt和spout实例就是 4+4+4=12 再加上两个worker的Acker就是14个task。exector 是bolt的设置的值2+2+2=6个再加上两个acker的值,就是8个。同时,一个executor执行了两个task。8个executor平均分配到两个worker上面了。

总结

exector和task的值,和拓扑结构有关系,拓扑中的spout 和bolt设置的parallelism_hint都会影响到exector和task的数量。task和exectuor之间的关系在设置上就已经确定了,最好exector和task之间,task 的数量最好设置成executor的倍数,这样每个executor执行的task才是一样的。
说到这里,相信大家对并发度,有了比较清晰的理解。

上一篇:flink专题


下一篇:Storm学习笔记