Storm命令行操作
1)nimbus:启动nimbus守护进程
storm nimbus
2)supervisor:启动supervisor守护进程
storm supervisor
3)ui:启动UI守护进程。
storm ui
4)list:列出正在运行的拓扑及其状态
storm list
5)logviewer:Logviewer提供一个web接口查看Storm日志文件。
storm logviewer
6)jar:
storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
7)kill:杀死名为Topology-name的拓扑
storm kill topology-name [-w wait-time-secs]
-w:等待多久后杀死拓扑
8)active:激活指定的拓扑spout。
storm activate topology-name
9)deactivate:禁用指定的拓扑Spout。
storm deactivate topology-name
10)help:打印一条帮助消息或者可用命令的列表。
storm help
storm help
常用API
component
1)基本接口
(1)IComponent接口
(2)ISpout接口
(3)IRichSpout接口
(4)IStateSpout接口
(5)IRichStateSpout接口
(6)IBolt接口
(7)IRichBolt接口
(8)IBasicBolt接口
2)基本抽象类
(1)BaseComponent抽象类
(2)BaseRichSpout抽象类
(3)BaseRichBolt抽象类
(4)BaseTransactionalBolt抽象类
(5)BaseBasicBolt抽象类
spout水龙头
Spout最顶层抽象是ISpout接口
(1)Open()
是初始化方法
(2)close()
在该spout关闭前执行,但是并不能得到保证一定被执行,kill -9 时不执行,Storm kill {topoName} 时执行
(3)activate()
当Spout已经从失效模式中激活时被调用。该Spout的nextTuple()方法很快就会被调用。
(4)deactivate ()
当Spout已经失效时被调用。在Spout失效期间,nextTuple不会被调用。Spout将来可能会也可能不会被重新激活。
(5)nextTuple()
当调用nextTuple()方法时,Storm要求Spout发射元组到输出收集器(OutputCollectot),NextTuple方法应该是非阻塞的,所以如果Spout没有元组可以发射,该方法应该返回,nextTuple(),ack()和fail方法都在Spout任务的单一线程内紧密循环被调用,当没有元组可以发射时,可以让nextTuple去sleep很短的时间,比如1ms,这样就不会浪费太多的cpu资源
(6)ack()
成功处理tuple回调方法
(7)fail()
处理失败tuple回调方法
原则:通常情况下(Shell和事务除外),实现一个Spout,可以直接实现接口IRichSpout ,如果不想写多余的代码,可以直接继承BaseRichSpout
bolt转接头
bolt的最顶层抽象是IBolt接口
(1)prepare()
prepare()方法在集群的工作进程内被初始化时调用,提供了Bolt执行所需要的环境
(2) execute()
接受一个tuple进行处理,也可emit数据到下一级组件
(3)cleanup()
cleanup方法当一个IBolt即将关闭时被调用不能保证cleanup()方法一定会被调用,因为Supervisor可以对集群的工作进程使用kill -9命令强制杀死进程命令。
如果在本地模式运行Storm,当拓扑被杀死的时候,可以保证cleanup()方法一定会被调用
实现一个Bolt,可以实现IRichBolt接口或者继承BaseRichBolt,实际上相当于自动做了prepare方法和collector.emit.ack(inputTuple)。
spout的tail特性
Storm可以实时监测文件数据,当文件数据变化时,Storm自动读取。