storm源码分析研究(二)

2021SC@SDUSC

spout源码分析(一)

2021SC@SDUSC

文章目录


2021SC@SDUSC

核心概念介绍

1、结构:
Spout是storm的核心组件之一,最源头的接口是IComponent。

2、发送:
当Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。Spout可以发射多个流,可以定义多个流(即定义多个stream),也可以使用方法来发射指定的流。

3、重要结构:
Spout的重要方法是nextTuple,nextTuple方法发射一个新的元组到Topology,如果没有新的元组发射,则直接返回。注意任务Spout的nextTuple方法都不要实现成阻塞的,因为storm是在相同的线程中调用spout的方法。
Spout的另外两个重要方法是ack和fail方法,当spout发射的元组被拓扑成功处理时,调用ack方法,当处理失败时,调用fail方法,此外,ack和fail方法仅被可靠的spout调用。

ISpout.java

ISpout接口:
storm实现主要依靠以下几个函数,全局代码如下:

package org.apache.storm.spout;

import java.io.Serializable;
import java.util.Map;
import org.apache.storm.task.TopologyContext;
public interface ISpout extends Serializable {
 
    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);

    void close();

    void activate();

    void deactivate();

    void nextTuple();

    void ack(Object msgId);

    void fail(Object msgId);
}

Strom支持所有的基本类型,当它使用元组作为数据模型,元组中的每个字段都可以是任何类型的对象。而如果要使用自己定义的类型,需要为自己定义的类型实现并且注册一个serializer。每个节点还必须要为输出的元组定义字段名称。
部分函数解释:
open():
当该组件的任务在集群上初始化时调用。它为spout提供了执行spout的环境。
close():
当ISpout即将关闭时调用。不能保证会调用close,因为supervisor会杀死集群上的的worker进程。
activate():
当spout从非激活模式被激活时调用。
deactivate():
当spout失效时调用。

ShellSpout.java

重载函数如下:

    public void open(Map<String, Object> topoConf, TopologyContext context,
                     SpoutOutputCollector collector) {
        this.collector = collector;
        this.context = context;

        if (topoConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
        } else {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
        }

        process = new ShellProcess(command);
        if (!env.isEmpty()) {
            process.setEnv(env);
        }

        Number subpid = process.launch(topoConf, context, changeDirectory);
        LOG.info("Launched subprocess with pid " + subpid);

        logHandler = ShellUtils.getLogHandler(topoConf);
        logHandler.setUpContext(ShellSpout.class, process, this.context);

        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    @Override
    public void close() {
        heartBeatExecutorService.shutdownNow();
        process.destroy();
        running = false;
    }

    @Override
    public void nextTuple() {
        this.sendSyncCommand("next", "");
    }

    @Override
    public void ack(Object msgId) {
        this.sendSyncCommand("ack", msgId);
    }

    @Override
    public void fail(Object msgId) {
        this.sendSyncCommand("fail", msgId);
    }

 @Override
    public void activate() {
        LOG.info("Start checking heartbeat...");
        // prevent timer to check heartbeat based on last thing before activate
        setHeartbeat();
        if (heartBeatExecutorService.isShutdown()) {
            //In case deactivate was called before
            heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
        }
        heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
        this.sendSyncCommand("activate", "");
    }

    @Override
    public void deactivate() {
        this.sendSyncCommand("deactivate", "");
        heartBeatExecutorService.shutdownNow();
    }



void open(Map<String, Object> topoConf, TopologyContext context,SpoutOutputCollector collector)
参数:
topoconf :
Storm关于这个Spout的配置
context :
用来获取该Spout任务的信息,包括任务id,组件id,输入输出信息等等
collector :
用来从这个Spout里发送元组,元组可以在任何时间里发送,包括open和close函数里。collector是线程安全的,应该被作为一个实例对象保存到Spout对象里。

void ack(Object msgId):
以msgId消息告诉Storm这个Spout已经成功输出了该元组
void activate():
激活Spout,Spout从deactivate模式转化为activate模式,Spout开始调用nextTuple输出数据。
void close():
关闭Spout
void deactivate():
解除激活Spout,Spout从activate模式转化为deactivate模式,Spout停止调用nextTuple输出数据
void fail(Object msgId):
以msgId消息告诉Storm这个Spout输出该元组失败,主要用于将该元组重新放回消息队列,以在一段时间后重发该元组
void nextTuple():
调用该函数请求Storm发送元组到Output Collector,这个函数不应该是阻塞的,当没有元组发送时,一般调用sleep,以充分利用CPU。

参考链接:
https://blog.csdn.net/wdasdaw/article/details/48896321
https://xlucas.blog.csdn.net/article/details/55301577

上一篇:实时流处理框架之Storm的安装与部署


下一篇:大数据学习路线图