Storm命令以及常用API

Storm命令行操作

1)nimbus:启动nimbus守护进程

​ storm nimbus

2)supervisor:启动supervisor守护进程

​ storm supervisor

3)ui:启动UI守护进程。

​ storm ui

4)list:列出正在运行的拓扑及其状态

​ storm list

5)logviewer:Logviewer提供一个web接口查看Storm日志文件。

​ storm logviewer

6)jar:

storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】

7)kill:杀死名为Topology-name的拓扑

​ storm kill topology-name [-w wait-time-secs]

​ -w:等待多久后杀死拓扑

8)active:激活指定的拓扑spout。

storm activate topology-name

9)deactivate:禁用指定的拓扑Spout。

​ storm deactivate topology-name

10)help:打印一条帮助消息或者可用命令的列表。

​ storm help

​ storm help

常用API

component

1)基本接口

​ (1)IComponent接口

​ (2)ISpout接口

(3)IRichSpout接口

​ (4)IStateSpout接口

​ (5)IRichStateSpout接口

​ (6)IBolt接口

(7)IRichBolt接口

​ (8)IBasicBolt接口

2)基本抽象类

​ (1)BaseComponent抽象类

(2)BaseRichSpout抽象类

(3)BaseRichBolt抽象类

​ (4)BaseTransactionalBolt抽象类

​ (5)BaseBasicBolt抽象类

spout水龙头

Spout最顶层抽象是ISpout接口
Storm命令以及常用API
(1)Open()

是初始化方法

(2)close()

在该spout关闭前执行,但是并不能得到保证一定被执行,kill -9 时不执行,Storm kill {topoName} 时执行

(3)activate()

​ 当Spout已经从失效模式中激活时被调用。该Spout的nextTuple()方法很快就会被调用。

(4)deactivate ()

​ 当Spout已经失效时被调用。在Spout失效期间,nextTuple不会被调用。Spout将来可能会也可能不会被重新激活。

(5)nextTuple()

当调用nextTuple()方法时,Storm要求Spout发射元组到输出收集器(OutputCollectot),NextTuple方法应该是非阻塞的,所以如果Spout没有元组可以发射,该方法应该返回,nextTuple(),ack()和fail方法都在Spout任务的单一线程内紧密循环被调用,当没有元组可以发射时,可以让nextTuple去sleep很短的时间,比如1ms,这样就不会浪费太多的cpu资源

(6)ack()

成功处理tuple回调方法

(7)fail()

处理失败tuple回调方法

原则:通常情况下(Shell和事务除外),实现一个Spout,可以直接实现接口IRichSpout ,如果不想写多余的代码,可以直接继承BaseRichSpout

bolt转接头

bolt的最顶层抽象是IBolt接口

Storm命令以及常用API
(1)prepare()

prepare()方法在集群的工作进程内被初始化时调用,提供了Bolt执行所需要的环境

(2) execute()

接受一个tuple进行处理,也可emit数据到下一级组件

(3)cleanup()

cleanup方法当一个IBolt即将关闭时被调用不能保证cleanup()方法一定会被调用,因为Supervisor可以对集群的工作进程使用kill -9命令强制杀死进程命令。

如果在本地模式运行Storm,当拓扑被杀死的时候,可以保证cleanup()方法一定会被调用

实现一个Bolt,可以实现IRichBolt接口或者继承BaseRichBolt,实际上相当于自动做了prepare方法和collector.emit.ack(inputTuple)。

spout的tail特性

Storm可以实时监测文件数据,当文件数据变化时,Storm自动读取。

上一篇:Storm模拟将接收到日志的会话id打印在控制台


下一篇:Storm搭建