storm源码分析研究(九)

2021SC@SDUSC

bolt源码分析(四)

2021SC@SDUSC

本文主要介绍一下Bolt输出收集器,Bolt处理好的消息都是通过输出收集器发送出去的,不同类型的Bolt所使用的输出收集器也是不同的。

上一篇文章中介绍了几个bolt端口,它们分别使用的输出收集器如下:
IRichBolt:
它使用OutputCollector输出收集器,该收集器实现IOutputCollector接口,实际上是一个代理类。
IBasicBolt:
它使用BasicOutputCollector输出收集器,该收集器实际上是OutputCollector
的封装类,实现的是IBasicOutputCollector接口。
IBatchBolt:
它使用BatchOutputCollector输出收集器,该收集器是一个虚基类,Storm提
供了它的默认实现类BatchOutputCollectorImpl , 这个类实际上也是通过封装OutputCollector类来实现消息发送的。

IOutputCollector.java

public interface IOutputCollector extends IErrorReporter {
    /**
     * Returns the task ids that received the tuples.
     */
    List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void ack(Tuple input);

    void fail(Tuple input);

    void resetTimeout(Tuple input);

    void flush();
}

接口IErrorReporter定义了reportError方法,其输入为一个Throwable对象,用户可以在该方法中处理异常。而接口IOutputCollector扩展了接口IErrorReporter,并且定义了如上的一些基本方法。

emit方法:
用来向外发送数据,它的返回值是该消息所有发送目标的Taskld集合,其输入参数的含义如下所示:
streamld:消息将被输出到的流。
anchors:输出消息的标记,通常代表该条消息是由哪些消息产生的,主要用于消息的Ack系统。
tuple:要输出的消息,为一个Object列表。

emitDirect方法:
其输入列表与emit方法相似,主要区别在于,emitDirect发送的消息只有
指定的Task才可以接收。这个方法要求streamld对应的流必须被定义为直接流,同时接收端的Task必须通过直接分组的方式来接收消息,否则会抛出异常。如果没有下游节点接收该消息,那么此类消息其实也就没有被真正发送。

fail和ack方法:
用来表示消息是否被成功处理。

Storm提供了IOutputCollector接口的默认实现类OutputCollector,它实际上也是一个代理。它包含一个真正工作的IOutputCollector实例,这个对象是在Clojure代码中定义的。OutputCollector主要用于从IRichBolt向外发送数据。在OutputCollector的实现中,所有操作都由代理对象完成。

IBasicOutputCollector.java

package org.apache.storm.topology;

import java.util.List;
import org.apache.storm.task.IErrorReporter;
import org.apache.storm.tuple.Tuple;

public interface IBasicOutputCollector extends IErrorReporter {
    List<Integer> emit(String streamId, List<Object> tuple);

    void emitDirect(int taskId, String streamId, List<Object> tuple);

    void resetTimeout(Tuple tuple);
}



如果使用IBasicBolt, Storm框架会自动帮用户进行Ack、Fail和Anchor操作。

BatchOutputCollector.java

public abstract class BatchOutputCollector {

    public List<Integer> emit(List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple);
    }

    public abstract List<Integer> emit(String streamId, List<Object> tuple);

    public void emitDirect(int taskId, List<Object> tuple) {
        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
    }

    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);

    public abstract void flush();

    public abstract void reportError(Throwable error);
}

BatchOutputCollector是Storm中用于数据批处理的输出收集器。它的方法跟IBasicOutputCollector中定义的接口方法基本一致。Storm提供了BatchOutputCollector的默认实现类BatchOutputCollectorlmpl,它实际上是一个代理类,内部封装了OutputCollector变量,所有的方法都通过调用OutputCollector的方法来实现。

通过了解bolt输出收集器,可以更好的了解bolt接口,对此总结一下:

IRichBolt:Storm中最常用来定义Topology组件的接口。它十分灵活,用户可以通过其实现各种控制逻辑,并且能控制何时进行Ack、 Fail和Anchor操作。

IBasicBolt:Storm中提供的定义简单逻辑的Topology组件接口。对于这种Bolt, Storm内置实现了Ack、Fail和Anchor的机制,用户基于它实现自己的Bolt也比较简单。但是它的使用是有限制的,基于收到的某条消息衍生出来的所有消息必须在一次execute中发送出(或者需要对消息进行缓存并且编号),否则内置的Ack机制将不能保证Bolt的正常工作。所以,用户应该避免使用该类型的Bolt来做诸如聚集或者连接的操作。

IBatchBolt:它是Storm提供的用来处理批量数据的接口。目前,它只用于事务Topology中,它是Storm实现事务Topology的基础

上一篇:storm源码分析研究(十一)


下一篇:培训机构python大纲