Storm特性
1. 低延迟和高性能
在一个小集群中,每个节点每秒可以处理数以百万计的消息。
2. 可扩展
在Storm集群中主要有三个实体:工作进程、线程和任务。Storm集群中每台机器上都可以运行多个工作进程,每个工作进程又可以创建多个线程,每个线程可以执行多个任务,任务是真正进行数据处理的实体。
3. 高可靠性
Storm可以保证Spout发出的每条消息都能被完全处理,Spout发出的消息后续可能会触发产生成千上万条消息,可以形象的理解为一棵消息树,只有当这颗消息树中的所有消息都被处理了才叫“完全处理”,这种特殊的策略会在后面详细介绍。
4. 高容错性
如果消息在处理过程中出现了一些异常,Storm会重新部署这个出问题的处理单元,Storm保证一个处理单元永远运行(除非你显式结束这个处理单元)
5. 编程模型简单
Storm为大数据的实时计算提供了一些简单优美的原语,大大降低了开发并行实时处理任务的复杂性。
6. 支持多种编程语言
7. 支持本地模式
Storm与Hadoop的比较
Hadoop架构简介
Hadoop架构的核心组成部分是HDFS(Hadoop Distributed File System,Hadoop分布式文件系统)和MapReduce分布式计算框架。HDFS采用Master/Slave体系结构,在集群中由一个主节点充当NameNode,负责文件系统元数据的管理,其它多个子节点充当Datanode,负责存储实际的数据块。
MapReduce分布式计算模型由JobTracker和TaskTracker两类服务进程实现,JobTracker负责任务的调度和管理,TaskTracker负责实际任务的执行。
Hadoop架构的瓶颈
在手机阅读BI大屏时延项目中,业务需求为处理业务平台产生的海量用户数据,展现业务中PV(Page View,页面浏览量)、UV(Unique Visitor,独立访客)、营收和付费用户数等关键运营指标,供领导层实时了解运营状况,做出经营决策。在一期项目的需求描述中,允许的计算时延是15分钟。
根据需求,在一期项目的实施中,搭建了Hadoop平台与Hive数据仓库,通过编写Hive存储过程,来完成数据的处理,相当于是一个离线的批处理过程。不同的运营指标拥有不同的算法公式,各公式的复杂程度不同导致各运营指标算法复杂度不同,因此所需要的计算时延也各不相同,如PV指标的计算公式相对简单,可以在5分钟内完成计算,而页面访问成功率指标的计算公式相对复杂,需要10分钟以上才能完成计算。项目到达二期阶段时,对实时性的要求有了进一步提高,允许的计算时延减少到5分钟。在这种应用场景下,Hadoop架构已经不能满足需要,无法在指定的时延内完成所有运营指标的计算。
在以上的应用场景中,Hadoop的瓶颈主要体现在以下两点:
- MapReduce计算框架初始化较为耗时,并不适合小规模的批处理计算。因为MapReduce框架并非轻量级框架,在运行一个作业时,需要进行很多初始化的工作,主要包括检查作业的输入输出路径,将作业的输入数据分块,建立作业统计信息以及将作业代码的Jar文件和配置文件拷贝到HDFS上。当输入数据的规模很大时,框架初始化所耗费的时间远远小于计算所耗费的时间,所以初始化的时间可以忽略不计;而当输入数据的规模较小时,初始化所耗费的时间甚至超过了计算所耗费的时间,导致计算效率低下,产生了性能上的瓶颈。
- Reduce任务的计算速度较慢。有的运营指标计算公式较为复杂,为之编写的Hive存储过程经Hive解释器解析后产生了Reduce任务,导致无法在指定的时延内完成计算。这是由于Reduce任务的计算过程分为三个阶段,分别是copy阶段,sort阶段和reduce阶段。其中copy阶段要求每个计算节点从其它所有计算节点上抽取其所需的计算结果,copy操作需要占用大量的网络带宽,十分耗时,从而造成Reduce任务整体计算速度较慢。
Storm架构简介
与Hadoop主从架构一样,Storm也采用Master/Slave体系结构,分布式计算由Nimbus和Supervisor两类服务进程实现,Nimbus进程运行在集群的主节点,负责任务的指派和分发,Supervisor运行在集群的从节点,负责执行任务的具体部分。
- Nimbus:负责资源分配和任务调度。
- Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
- Worker:运行具体处理组件逻辑的进程。
- Task:worker中每一个spout/bolt的线程称为一个task。同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。
Storm架构中使用Spout/Bolt编程模型来对消息进行流式处理。消息流是Storm中对数据的基本抽象,一个消息流是对一条输入数据的封装,源源不断输入的消息流以分布式的方式被处理。Spout组件是消息生产者,是Storm架构中的数据输入源头,它可以从多种异构数据源读取数据,并发射消息流。Bolt组件负责接收Spout组件发射的信息流,并完成具体的处理逻辑。在复杂的业务逻辑中可以串联多个Bolt组件,在每个Bolt组件中编写各自不同的功能,从而实现整体的处理逻辑。
Storm架构与Hadoop架构对比
在Hadoop架构中,主从节点分别运行JobTracker和TaskTracker进程,在Storm架构中,主从节点分别运行Nimbus和Supervisor进程。在Hadoop架构中,应用程序的名称是Job,Hadoop将一个Job解析为若干Map和Reduce任务,每个Map或Reduce任务都由一个Child进程来运行,该Child进程是由TaskTracker在子节点上产生的子进程。
在Storm架构中,应用程序的名称是Topology,Storm将一个Topology划分为若干个部分,每部分由一个Worker进程来运行,该Worker进程是Supervisor在子节点上产生的子进程,在每个Worker进程中存在着若干Spout和Bolt线程,分别负责Spout和Bolt组件的数据处理过程。
从应用程序的比较中可以看明显地看到Hadoop和Storm架构的主要不同之处。在Hadoop架构中,应用程序Job代表着这样的作业:输入是确定的,作业可以在有限时间内完成,当作业完成时Job的生命周期走到终点,输出确定的计算结果。而在Storm架构中,Topology代表的并不是确定的作业,而是持续的计算过程。在确定的业务逻辑处理框架下,输入数据源源不断地进入系统,经过流式处理后以较低的延迟产生输出。如果不主动结束这个Topology或者关闭Storm集群,那么数据处理的过程就会持续地进行下去。
通过以上的分析,我们可以看到,Storm架构是如何解决Hadoop架构瓶颈的:
- Storm的Topology只需初始化一次。在将Topology提交到Storm集群的时候,集群会针对该Topology做一次初始化的工作。此后,在Topology运行过程中,对于输入数据而言,是没有计算框架初始化耗时的,有效避免了计算框架初始化的时间损耗。
- Storm使用ZeroMQ作为底层的消息队列来传递消息,保证消息能够得到快速的处理。同时Storm采用内存计算模式,无需借助文件存储,直接通过网络直传中间计算结果,避免了组件之间传输数据的大量时间损耗。
Storm基本概念
Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。
在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程worker。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程worker组成。
Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。
Topology
计算任务Topology是由不同的Spouts和Bolts,通过数据流(Stream)连接起来的图。下面是一个Topology的结构示意图:
其中包含有:
Spout:Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。Spout可以是可靠的,也可以是不可靠的。如果这个tuple没有被Storm完全处理,可靠的消息源可以重新发射一个tuple,但是不可靠的消息源一旦发出一个tuple就不能重发了。(可靠性会在下面介绍)
Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回(如果已经没有新的tuple)。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。Bolt:Storm中的消息处理者,用于为Topology进行消息的处理,Bolt可以执行过滤, 聚合, 查询数据库等操作,而且可以一级一级的进行处理。
下图是Topology的提交流程图:
下图是Storm的数据交互图。可以看出两个模块Nimbus和Supervisor之间没有直接交互。状态都是保存在Zookeeper上。Worker之间通过ZeroMQ传送数据。(storm所有的元数据信息保存在Zookeeper中。)
Worker (进程)
一个topology可能会在一个或者多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程worker来执行,那么每个工作进程会处理其中的6个tasks。Storm会尽量均匀的工作分配给所有的worker。setBolt 的最后一个参数是你想为bolts的并行量。
一个topology的worker数量是conf设置的,这个设置是说给这个topo多少个worker资源给他。
Tasks
每一个spout和bolt会被当作很多task在整个集群里执行。每一个executor对应到一个线程,在这个线程上运行多个task,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder类的setSpout和setBolt来设置并行度。