storm知识点学习总结(一)

storm --流式处理框架

   storm是个实时的、分布式以及具备高容错的计算系统

   - storm 进程常驻内存

  - storm 数据不经过磁盘,在内存中处理

  Twitter开源的分布式实时大数据处理框架,最早开源于github

  storm 架构   -Nimbus  -Supervisor  -Worker

  编程模型: - DAG      -Spout  -Bolt

  数据传输: - ZMQ (Twitter早起的产品)

                     - ZeroMQ 开源的消息传递框架,并不是一个MessageQueue

                    -Netty Netty是基于NIO的网络框架,更加高效 (之所以storm0.9版本之后使用netty,是因为ZMQ的license和storm的licemse不兼容)

  高可靠性: -异常处理    -消息可靠性保障机制(ACK)

  可维护性: -stormUI图形化监控接口

  流式处理(同步与异步):客户端提交数据进行计算,并不会等待数据计算结果 

  逐条处理:例如ETL(数据清洗)

  统计分析: 例如计算pv、uv、访问热点以及某些数据的聚合、加和、平均等

  --客户端提交数据之后,计算完成结果存储到redis、Hbase、Mysql或者其他的MQ当中

   客户端并不关心最终的计算结果是多少

  实时请求应答服务(同步) -客户端提交数据请求之后,立刻取得计算结果并返回给客户端

 DRPC:

 实时请求处理:

storm : 进程 、线程常驻内存运行,数据不仅如此磁盘,数据通过网络进行传递

MapReduce: 为TB、PB级别数据设计 的批处理计算框架

storm与mapreduce的比较:

storm: 流式处理、毫秒级、DAG模型、常驻运行

MapReduce: 批处理、分钟级、map+reduce模型 、反复启停

storm:纯流式处理 

           - 专门为流式处理设计

          - 数据传输模式更为简单,很多地方也更为高效

         -并不是不能做批处理,它也可以用来做微批处理,来提高吞吐

 Spark Streaming :微批处理  

       -- 将RDD做的很小来用小的批处理来接近流式处理

      --基于内存和DAG处理任务做的很快

 storm: 流式处理,毫秒级,已经很稳定,独立系统专门流式计算设计

 SparkStreaming: 微批处理、秒级、稳定性改进中、spark核心之上的一种计算模型,能与其他的组件进行很好的结合

 storm计算模型: 

          Topology-DAG 有向无环图的实现

          --对于strom实时计算逻辑的封装

          --即、由一系列通过数据流相互关联的spout、bolt所组成的拓扑结构

         --生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止

       tuple  --元祖

   ---storm中最小的数据组成单元

      stream --数据流

      --从spout中源源不断传递数据给bolt、以及上一个bolt传递给下一个bolt,所形成的这些数据通道即叫做stream

      --stream声明时需给其指定一个ID

      spout -数据源 

      -拓扑中数据流的来源。一般会从指定外部的数据源读取元祖(tuple)发送到拓扑(Topology)中

     -一个spout可以发送多个数据流(stream)

    --可先通过OutputFieldsDeclear中的declear方法声明定义的不同数据流,发送数据时SpoutOutPutCollector中的emit方法指定数据流的参数将数据发送出去

   --spout中最核心的方法是nextyouple,该方法会被storm线程不断对的调用、主动从数据源拉取数据,在通过emit方法将数据生成元祖(tuple)发送给只有的bolt计算

  

    -bolt  数据流处理组件

    - 拓扑中数据处理均有bolt完成。对于简单的任务或者数据流转换,单个bolt可以简单的实现;更加复杂的场景往往需要多个bolt分多个步骤处理完成

    -一个bolt可以发送多个数据流(Stream)

   --可以先通过outputFiledDeclear中的declear方法生命定义的不同数据流,发送数据时通过spoutOutputcollector中的emit方法指定数据流id参数将数据发送出去

   --bolt最核心的方法是executor方法,该方法负责接收一个元祖数据、真正实现核心的业务逻辑

  stream Grouping --数据流分组

用storm 实现wordcount单词统计

 数据发送类

package com.storm.spout;

import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class Wdcspout extends BaseRichSpout{

    private SpoutOutputCollector collector;

    String[] text = {
            "nihao hello ok",
            "nice to meet hello",
            "where are you ok",
            "where is you home"
            
    };
    
    Random r = new Random();
    @Override
    public void nextTuple() {
        // TODO Auto-generated method stub
        
        List line = new Values(text[r.nextInt(3)]);
        this.collector.emit(line);
    
        System.out.println("line==============="+line);
        Utils.sleep(1000);
        
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
        
        
        
    }

    /**
     * 
     * 
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("line"));
    }

}

数据处理类:

package com.storm.bolt;

import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class Wcdbolt extends BaseRichBolt{

    private OutputCollector collector;

    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        //1、获取数据,并对获取的数据进行切分
        String[] words =  input.getString(0).split(" ");
        //2、发送数据
        for(String word: words) {
            
            List tuple = new Values(word);
            this.collector.emit(tuple);
            
        }
        
        
        
        
    }

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("tuple"));
    }

}

第二个处理数据的bolt

package com.storm.bolt;

import java.util.HashMap;
import java.util.Map;


import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class wcdsbolt extends BaseRichBolt{

    private OutputCollector collector;

    
    Map<String, Integer> map = new HashMap<>();
    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        //对接受到的数据进行处理 
        String word = input.getStringByField("tuple");
        int count = 1;
        //如果单词不存在,则把单词的统计数添加到map中,否则,在原址value的基础之上加1 
        if(map.containsKey(word)) {
            count = map.get(word)+1;
        }
        
        map.put(word, count);
        
        System.err.println(word+"----------------------------"+count);
        
        
    }

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub
        
    }

}

对处理结果提交到本地运行

package com.storm.test;

import org.jgrapht.alg.TarjanLowestCommonAncestor.LcaRequestResponse;

import com.storm.bolt.Wcdbolt;
import com.storm.bolt.wcdsbolt;
import com.storm.spout.Wdcspout;

import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.Config;;

public class test1 {

    public static void main(String[] args) {
        
        TopologyBuilder tm = new TopologyBuilder();
        
        //单线程处理
        /*tm.setSpout("wcdspout", new Wdcspout());
        tm.setBolt("wcdbolt", new Wcdbolt()).shuffleGrouping("wcdspout");
        tm.setBolt("wcdsbolt", new wcdsbolt()).shuffleGrouping("wcdbolt");*/
        //多线程处理
        tm.setSpout("wcdspout", new Wdcspout());
        tm.setBolt("wcdbolt", new Wcdbolt(),3).shuffleGrouping("wcdspout");
        tm.setBolt("wcdsbolt", new wcdsbolt(),3).fieldsGrouping("wcdbolt", new Fields("tuple"));
        
        
        
        LocalCluster lm = new LocalCluster();
        
        lm.submitTopology("w", new Config(), tm.createTopology());    
    }
}

注: 当用多线程处理的时候,注意对于分发策略的选择。   否则会发生数据统计异常的错误 。分发策略主要由grouping方法来进行处理的。

 

storm grouping --数据流分组;(即数据的分发策略)

 

1、 shuffle grouping

      --随机分组,随机派发stream里面的tuple,保证每个bolt task 接受到的tuple数目大致相同 。

     --轮询,平均分配。

  2、 Fields grouping

     --按字段分组,比如,按“user-id”这个字段来进行分组,那么具有同样“user-id”的tuple会被分到相同的bolt里面的一个task,而不同的"user-id"则可能会被分到不同的task

     --

 3、 All grouping

   -- 广播分发,对于每一个tuple,所有的bolt都会收到

4、Global Grouping

   --全局分组,把tuple分配给task id 最低的task

  5、None grouping

    -- 不分组,这个分组的意思是说storm不关心到底是怎样进行分组的,目前这种分组和shufflegrouping 的分组效果是一样的。有一点不同的地方就是storm会把使用none grouping 的这个bolt放到这个bolt的订阅者的同一个线程中区,(未来storm如果可能的话,会进行这样的设计)

  6、direct grouping

   --指向型分组,这是一种比较特殊的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接受者的那个task处理这个消息。只要被声明为Direct stream 的消息流可以声明这种分组方法。而且这种消息的tuple必须使用emitDirect方式来发送。消息处理着可以通过TopologyContext来获取处理它的消息的task的id

 

7、Local or shuffle grouping 

   --本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的task,否则,和普通的shuffle grouping 行为一致

8、 customgrouping

   --自定义,相当于mapreduce哪里自己去实现一个partition一样。

 

storm 架构设计

     storm知识点学习总结(一)

Nimbus: --资源调度  --任务分配  --接受jar包

Supervisor: --接受Nimbus分配的任务

-- 启动、停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)

--Worker

   --运行具体处理运算组件的进程(每个worker对应执行一个Topology的子集)

   --worker任务类型,即spout,bolt任务两种

   --启动executor(executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务 )

--zookeeper:

storm 提交任务流程:  1、将提交的jar包上传至nimbus服务器numbus/inbox目录下  2、对topology进行检验处理   3、建立Topology在本地的存放目录nimbus\stormdist\topology-id(该目录下包含三个文件)

       stormjar.jar:  --从nimbus/inbox目录下移动来的Topology的jar包

       stormcode.ser:   --对Topology对象序列化法

       stormconf.ser:  --topology的运行配置

  nimbus任务分配:即根据代码初始化spout/bolt的task数目,并分配给对应的task-id,最后将这些信息写入到zookeeper的/task节点下

  nimbus在zookeeper上创建taskbeats节点,监控task的心跳

  将任务分配信息写入到assignment/topology-id节点中,此时即可认为任务提交完毕

  在zookeeper的/storms/topology-id节点下存放任务运行的时间、状态等信息

  定期检查zookeeper上storm节点,是否有新任务提交,删除本地不在运行的任务

  根据nimbus指定的任务信息启动该节点上的worker

   查看需要执行的task任务信息,获取到相应的task信息,即spout/bolt任务信息,执行具体的运算,并根据IP以及端口发送消息数据

   storm知识点学习总结(一)

 

  

storm的安装:

伪分布式的安装:

1、 上传安装压缩包   2、将安装压缩包解压  3、配置环境变量  4、启动storm相关命令

storm dev-zookeeper >> ./logs/zk.out 2>&1 & (将启动日志重定向到logs目录中,2>&1代表将标准的错误输出重定向到标准的正确输出中)

 

上一篇:流式计算的代表:Storm、Flink、SparkStreaming


下一篇:Storm框架最显著的应用