Storm入门教程
1. Storm基础
Storm
Storm主要特点
Storm基本概念
Storm调度器
Storm配置
Guaranteeing Message Processing(消息处理保障机制)
Daemon Fault Tolerance(守护线程容错机制)
理解Storm拓扑的并行
Tutorial
Local模式
本地模式的通用配置:
在生产环境中运行Topologies
通用配置
杀死topology
更新运行中的topology
监控topology
Local模式
本地模式模拟Storm集群用于测试和开发topologies。在本地模式运行topologies 类似于在集群中运行topologies 。
为了创建进程内的集群,可用LocalCluster类实现,如下:
import org.apache.storm.LocalCluster;
LocalCluster cluster = new LocalCluster();
你可以用LocalCluster 的submitTopology 方法提交topologies 。对于用StormSubmitter类中的方法,该类用于向Storm集群中提交topologies ,submitTopology 方法参数列表为topology名称,topology配置和topology对象,你可以用killTopology 方法根据Topology 的名称结束Topology ,如下所示:
cluster.shutdown();
本地模式的通用配置:
1.Config.TOPOLOGY_MAX_TASK_PARALLELISM:这个配置项是单个组件执行线程的最大值,生产环境中的topologies 并行度(成千上百个线程)较大导致负载过度,所以需要在本地模式中尽量测试。这个配置项允许你容易控制并行度。
2.Config.TOPOLOGY_DEBUG:当设置为true时,Storm将记录任一spout或bolt发送的tuple。这对于调试尤其有用。
在生产环境中运行Topologies
在生产集群中运行Topologies和本地模式一样,步骤如下:
1)定义topology(如果用Java那么用类TopologyBuilder来定义topology)
2)用类StormSubmitter 将topology提交到集群中。StormSubmitter 接受topology的名称、topology的配置和topology本身。如下:
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
3)将你的代码及其依赖打成jar包(除了Storm jars,因为所有的worker节点上已存在类路径上)
如果你用Maven,那么 Maven Assembly Plugin插件能为你打jar包。在pom.xml中添加如下内容;
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
然后运行mvn assembly:assembly得到合适的jar包。确认jar包中排除了Storm jars因为集群中的类路径上已经存在。
4)用storm 客户端程序提交topology 到集群,指定jar路径、运行的类以及参数,如下:
storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
storm jar将提交jar到集群中并配置类StormSubmitter和集群通信。在本例中,在上述例子中上传了jar或会调用org.me.MyTopology类的主函数,参数列表为"arg1", "arg2", and "arg3".
通用配置
topology的配置项有很多,你可以在这里找到。前缀为“TOPOLOGY”的配置项可以被覆写(集群中的其他配置项不能被覆写)。下面是TOPOLOGY的常用配置:
1.Config.TOPOLOGY_WORKERS:改项设置了用于执行topology的worker进程数。例如,如果你设置为25,那么集群中将会有25个Java进程用于执行所有的任务。如果topology中所有的并行度为150,那么每个worker进程将有6个线程。
2.Config.TOPOLOGY_ACKER_EXECUTORS:该项配置了executors的数目,它用于跟踪元组树和监控spout元组已经被完全处理。Ackers 是Storm可靠性模型的一部分。如果没有设置该项或设置为空,Storm将设置acker executor和worker的数目相等。如果这个变量设置为0,Storm将立马响应元组只要他们从spout发送出来,不能保证可靠性。
3.Config.TOPOLOGY_MAX_SPOUT_PENDING:该项设置了单个spout 任务所能接受的元组数目(只要没有acked的元组或失败的都算)。强烈推荐你配置该项防止队列持续增大。
4.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:spout元组被认定失败的超时时间。该项默认是30s,对于大多数topologies是有效的。
5.Config.TOPOLOGY_SERIALIZATIONS:你可以给Storm注册更多的序列化器,这样你就可以自定义元组类型。
杀死topology
杀死topology,如下命令:
storm kill {stormname}
stormname和你提交时的topology保持一致。
Storm不会立即杀死topology。相反,它会使所有的spouts失效,这样它们不会发送更多的元组,然后等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 秒后杀死所有的workers,这允许topology有足够的时间处理正在执行的tuple。
更新运行中的topology
为了更新运行中的topology,当前唯一的选择是杀死当前的topology然后重新提交。一个计划中的特性是用storm swap命令切换运行中的topology为新的topology,确保最小的故障时间。
监控topology
监控topology最好用Storm UI。Storm UI提供了任务中的错误发生信息和关于每个组件的吞吐量和延迟性能的细粒度的状态。当然,你也可以看集群中worker的日志。