Storm框架入门
1 Topology构成
和同样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology。但是Job在运行结束之后会自行结束,Topology却只能被手动的kill掉,否则会一直运行下去。
Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点)。所有Topology任务的提交必须在Storm客户端节点上进行(需要配置~/.storm/storm.yaml文件),由Nimbus节点分配给其他Supervisor节点进行处理。Nimbus节点首先将提交的Topology进行分片,分成一个个的Task,并将Task和Supervisor相关的信息提交到zookeeper集群上,Supervisor会去zookeeper集群上认领自己的Task,通知自己的Worker进程进行Task的处理。总体的Topology处理流程图为:
每个Topology都由Spout和Bolt组成,在Spout和Bolt传递信息的基本单位叫做Tuple,由Spout发出的连续不断的Tuple及其在相应Bolt上处理的子Tuple连起来称为一个Steam,每个Stream的命名是在其首个Tuple被Spout发出的时候,此时Storm会利用内部的Ackor机制保证每个Tuple可靠的被处理。
而Tuple可以理解成键值对,其中,键就是在定义在declareStream方法中的Fields字段,而值就是在emit方法中发送的Values字段。
2 Configuration
在运行Topology之前,可以通过一些参数的配置来调节运行时的状态,参数的配置是通过Storm框架部署目录下的conf/storm.yaml文件来完成的。在次文件中可以配置运行时的Storm本地目录路径、运行时Worker的数目等。
在代码中,也可以设置Config的一些参数,但是优先级是不同的,不同位置配置Config参数的优先级顺序为:
default.yaml<storm.yaml<topology内部的configuration<内部组件的special configuration<外部组件的special configuration
在storm.yaml中常用的几个选项为:
配置选项名称 |
配置选项作用 |
topology.max.task.parallelism |
每个Topology运行时最大的executor数目 |
topology.workers |
每个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖 |
storm.zookeeper.servers |
zookeeper集群的节点列表 |
storm.local.dir |
Storm用于存储jar包和临时文件的本地存储目录 |
storm.zookeeper.root |
Storm在zookeeper集群中的根目录,默认是“/” |
ui.port |
Storm集群的UI地址端口号,默认是8080 |
nimbus.host: |
Nimbus节点的host |
supervisor.slots.ports |
Supervisor节点的worker占位槽,集群中的所有Topology公用这些槽位数,即使提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来进行分配,当所有的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,如果有,就再次给新提交的Topology分配 |
supervisor.worker.timeout.secs |
Worker的超时时间,单位为秒,超时后,Storm认为当前worker进程死掉,会重新分配其运行着的task任务 |
drpc.servers |
在使用drpc服务时,drpc server的服务器列表 |
drpc.port |
在使用drpc服务时,drpc server的服务端口 |
3 Spouts
Spout是Stream的消息产生源, Spout组件的实现可以通过继承BaseRichSpout类或者其他*Spout类来完成,也可以通过实现IRichSpout接口来实现。
需要根据情况实现Spout类中重要的几个方法有:
3.1 open方法
当一个Task被初始化的时候会调用此open方法。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。
示例如下:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; }
3.2 declareOutputFields方法
此方法用于声明当前Spout的Tuple发送流。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的,其中的参数包括了发送的域Fields。
示例如下:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
3.3 getComponentConfiguration方法
此方法用于声明针对当前组件的特殊的Configuration配置。
示例如下:
public Map<String, Object> getComponentConfiguration() { if(!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); return ret; } else { return null; } }
这里便是设置了Topology中当前Component的线程数量上限。
3.4 nextTuple方法
这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法来实现的。
示例如下:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"twitter","facebook","google"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }
这里便是从一个数组中随机选取一个单词作为Tuple,然后通过_collector发送到Topology。
另外,除了上述几个方法之外,还有ack、fail和close方法等。Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法,这两个方法在BaseRichSpout类中已经被隐式的实现了。
4 Bolts
Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。
Bolt类需要实现的主要方法有:
4.1 prepare方法
此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。Bolt中Tuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是些在execute中。
示例如下:
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; }
4.2 declareOutputFields方法
用于声明当前Bolt发送的Tuple中包含的字段,和Spout中类似。
示例如下:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); }
此例说明当前Bolt类发送的Tuple包含了三个字段:"obj", "count", "actualWindowLengthInSeconds"。
4.3 getComponentConfiguration方法
和Spout类一样,在Bolt中也可以有getComponentConfiguration方法。
示例如下:
public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; }
此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用这个系统的组件的特性来完成。
4.4 execute方法
这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。
(1)emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。
(2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。
这两种情况要根据自己的场景来确定。
示例如下:
public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void execute(Tuple tuple) { _collector.emit(new Values(tuple.getString(0) + "!!!")); }
此外还有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法类似,都是在当前Component关闭时调用,但是针对实时计算来说,除非一些特殊的场景要求以外,这两个方法一般都很少用到。
5 Stream grouping
上文中介绍了Topology的基本组件Spout和Bolt,在Topology中,数据流Tuple的处理就是不断的通过调用不同的Spout和Bolt来完成的。不同的Bolt和Spout的上下游关系是通过在入口类中定义的。示例如下:
builder = new TopologyBuilder(); builder.setSpout(spoutId, new TestWordSpout(), 5); builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
9
此例中的builder是TopologyBuilder对象,通过它的createTopology方法可以创建一个Topology对象,同时此builder还要定义当前Topology中用到的Spout和Bolt对象,分别通过setSpout方法和setBolt方法来完成。
setSpout方法和setBolt方法中的第一个参数是当前的Component组件的Stream流ID号;第二个参数是具体的Component实现类的构造;第三个参数是当前Component的并行执行的线程数目,Storm会根据这个数字的累加和来确定Topology的Task数目。最后的小尾巴*Grouping是指的一个Stream应如何分配数据给Bolt上面的Task。目前Storm的Stream Grouping有如下几种:
(1)ShuffleGrouping:随机分组,随机分发Stream中的tuple,保证每个Bolt的Task接收Tuple数量大致一致;
(2)FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同一个Task中;
(3)AllGrouping:广播发送,每一个Task都会受到所有的Tuple;
(4)GlobalGrouping:全局分组,所有的Tuple都发送到同一个Task中,此时一般将当前Component的并发数目设置为1;
(5)NonGrouping:不分组,和ShuffleGrouping类似,当前Task的执行会和它的被订阅者在同一个线程中执行;
(6)DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的处理,而且,此时必须有emitDirect方法来发送;
(7) localOrShuffleGrouping:和ShuffleGrouping类似,若Bolt有多个Task在同一个进程中,Tuple会随机发给这些Task。
不同的的Grouping,需要根据不同的场景来具体设定,不一而论。
6 Topology运行
6.1 Topology运行方式
Topology的运行可以分为本地模式和分布式模式,模式的设置可以在配置文件中设定,也可以在代码中设置。
(1)本地运行的提交方式:
LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); cluster.killTopology(topologyName); cluster.shutdown();
(2)分布式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
需要注意的是,在Storm代码编写完成之后,需要打包成jar包放到Nimbus中运行,打包的时候,不需要把依赖的jar都打进去,否则如果把依赖的storm.jar包打进去的话,运行时会出现重复的配置文件错误导致Topology无法运行。因为Topology运行之前,会加载本地的storm.yaml配置文件。
在Nimbus运行的命令如下:
storm jar StormTopology.jar maincalss args
6.2 Topology运行流程
有几点需要说明的地方:
(1)Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件;
(2)在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm本身决定的;
(3)任务分配好之后,Nimbes节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息;
(4)Supervisor节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行;
(5)一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是*的。
最后一步会不间断的执行,除非手动结束Topology。
6.3 Topology方法调用流程
Topology中的Stream处理时的方法调用过程如下:
有几点需要说明的地方:
(1)每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。
(2)open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,是负责运行组件中的task的线程 的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。
(3)nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产 生*的Tuple流,体现实时性。相当于线程的run方法。
(4)在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(即Supervisor节 点),在每一个任务上反序列化component。
(5)Spout和Bolt之间、Bolt和Bolt之间的通信,是通过zeroMQ的消息队列实现的。
(6)上图没有列出ack方法和fail方法,在一个Tuple被成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理这个Tuple。
6.4 Topology并行度
在Topology的执行单元里,有几个和并行度相关的概念。
(1)worker:每个worker都属于一个特定的Topology,每个Supervisor节点的worker可以有多个,每个worker使用一个单独的端口,它对Topology中的每个component运行一个或者多个executor线程来提供task的运行服务。
(2)executor:executor是产生于worker进程内部的线程,会执行同一个component的一个或者多个task。
(3)task:实际的数据处理由task完成,在Topology的生命周期中,每个组件的task数目是不会发生变化的,而executor的数目却不一定。executor数目小于等于task的数目,默认情况下,二者是相等的。
在运行一个Topology时,可以根据具体的情况来设置不同数量的worker、task、executor,而设置的位置也可以在多个地方。
(1)worker设置:
(1.1)可以通过设置yaml中的topology.workers属性
(1.2)在代码中通过Config的setNumWorkers方法设定
(2)executor设置:
通过在Topology的入口类中setBolt、setSpout方法的最后一个参数指定,不指定的话,默认为1;
(3)task设置:
(3.1) 默认情况下,和executor数目一致;
(3.2)在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目;
6.5 终止Topology
通过在Nimbus节点利用如下命令来终止一个Topology的运行:
storm kill topologyName
kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失了。
7 Topology跟踪
Topology提交后,可以在Nimbus节点的web界面查看,默认的地址是http://NimbusIp:8080。
8 Storm应用
上面给出了如何编写Storm框架任务Topology的方法,那么在哪些场景下能够使用Storm框架呢?下面介绍Storm框架的几个典型的应用场景。
(1)利用Storm框架的DRPC进行大量的函数并行调用,即实现分布式的RPC;
(2)利用Storm框架的Transaction Topology,可以进行实时性的批量更新或者查询数据库操作或者应用需要同一批内的消息以及批与批之间的消息并行处理这样的场景,此时Topology中只能有一个TrasactionalSpout;
(3)利用滑动窗口的逻辑结合Storm框架来计算得出某段时间内的售出量最多的产品、购买者最多的TopN地区等;
(4)精确的广告推送,在用户浏览产品的时候,将浏览记录实时性的搜集,发送到Bolt,由Bolt来根据用户的账户信息(如果有的话)完成产品的分类统计,产品的相关性查询等逻辑计算之后,将计算结果推送给用户;
(5)实时日志的处理,Storm可以和一个分布式存储结合起来,实时性的从多个数据源发送数据到处理逻辑Bolts,Bolts完成一些逻辑处理之后,交给分布式存储框架进行存储,此时,Spout可以是多个;
(6)实时性的监控舆论热点,比如针对某个关键词,在用户查询的时候,产生数据源Spout,结合语义分析等,由Bolt来完成查询关键词的统计分析,汇总当前的舆论热点;
(7)数据流的实时聚合操作。
9 参考网址
http://xumingming.sinaapp.com/138/twitter-storm%E5%85%A5%E9%97%A8/
https://github.com/nathanmarz/storm/wiki
http://nathanmarz.github.io/storm/doc/index-all.html
有理解不到位的地方,欢迎批评指正,一起交流~