Flink运行时之结果分区消费端

Flink运行时之结果分区消费端

结果分区消费端

在前一篇,我们讲解了生产者分区,生产者分区是生产者任务生产中间结果数据的过程。消费者任务在获得结果分区可用的通知之后,会发起对数据的请求。我们仍然以生产者分区的例子作为假设,其在消费端示意图如下:

Flink运行时之结果分区消费端

可以看到在生产端和消费端存在对等的模型,具体ResultSubpartition中的数据如何被消费,我们将在本篇进行深入剖析。

输入网关

输入网关(InputGate)用于消费中间结果(IntermediateResult)在并行执行时由子任务生产的一个或多个结果分区(ResultPartition)。

可以认为生产端的ResultPartition跟消费端的InputGate是对等的。

Flink当前提供了两个输入网关的实现,分别是:

  • SingleInputGate:常规输入网关;
  • UnionInputGate:联合输入网关,它允许将多个输入网关联合起来;

我们主要分析SingleInputGate,因为它是消费ResultPartition的实体,而UnionInputGate主要充当InputGate容器的角色。

SingleInputGate主要的初始化逻辑被封装在其静态的create方法中,当一个Task被实例化时在其构造器中会调用该create方法初始化它对应的SingleInputGate实例。create方法根据传递进来的InputGateDeploymentDescriptor完成对其包含的所有InputChannel的实例化。因为InputChannel记录是按照跟生产端任务的位置来分类的(我们会在下面进行具体分析),所以其实例化也是按照InputGateDeploymentDescriptor中每个InputChannelDeploymentDescriptor包含的ResultPartitionLocation属性来初始化的。

不同的ResultPartitionLocation,消费端任务其请求结果子分区的方式也不同,这一点我们在讲解生产者分区是有所说明。

作为数据的消费者,InputGate最关键的方法自然是获取生产者所生产的缓冲区,提供该功能的方法为getNextBufferOrEvent,它返回的对象是我们之前谈到的统一的数据交换对象BufferOrEvent。

BufferOrEvent的直接消费对象是通信层API中的记录读取器,它会将Buffer中的数据反序列化为记录供上层任务使用。

我们以getNextBufferOrEvent方法为主线来分析SingleInputGate类。

public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
    //如果已接收到所有EndOfPartitionEvent事件,则说明每个ResultSubpartition中的数据都被消费完成
    if (hasReceivedAllEndOfPartitionEvents) {
        return null;
    }

    //触发所有的输入通道向ResultSubpartition发起请求
    requestPartitions();

    InputChannel currentChannel = null;
    //阻塞并循环等待有可获取数据的通道可用
    while (currentChannel == null) {
        if (isReleased) {
            throw new IllegalStateException("Released");
        }

        //从阻塞队列中请求(并删除)队首的输入通道,阻塞两秒钟,如果没有获取到则不断请求,直到获取到一个输入通道位置
        currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
    }

    //从输入通道中获得下一个Buffer
    final Buffer buffer = currentChannel.getNextBuffer();

    if (buffer == null) {
        throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
            "notified by channel about available data, but none was available.");
    }

    //如果该Buffer是用户数据,则构建BufferOrEvent对象并返回
    if (buffer.isBuffer()) {
        return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
    }
    //否则把它当作事件来处理
    else {
        final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());

        //如果获取到的是标识某ResultSubpartition已经生产完数据的事件
        if (event.getClass() == EndOfPartitionEvent.class) {
            //对获取该ResultSubpartition的通道进行标记
            channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
            //如果所有信道都被标记了,置全部通道获取数据完成
            if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
                hasReceivedAllEndOfPartitionEvents = true;
            }
            //对外发出ResultSubpartition已被消费的通知同时释放资源
            currentChannel.notifySubpartitionConsumed();
            currentChannel.releaseAllResources();
        }
        //以事件来构建BufferOrEvent对象
        return new BufferOrEvent(event, currentChannel.getChannelIndex());
    }
}

以上代码段中,第一个关键调用是requestPartitions方法。它会触发所有InputChannel发起对requestSubpartition方法的调用以请求生产端的ResultSubpartition。

有一种占位目的的UnknowInputChannel不响应该方法,因为它最终会被确定为是LocalInputChannel还是RemoteInputChannel,确定的时机通常是JobManager通知器其可消费,TaskManager调用当前SingleInputGate的updateInputChannel方法,确定UnknowInputChannel会转变的具体的通道类型后再调用requestSubpartition方法。

由于requestPartitions只是起到触发其内部的InputChannel去请求的作用,这个调用可能并不会阻塞等待远程数据被返回。因为不同的InputChannel其请求的机制并不相同,RemoteChannel就是利用Netty异步请求的。所以SingleInputGate采用阻塞等待以及事件回调的方式来等待InputChannel上的数据可用。具体而言,它在while代码块中循环阻塞等待有可获取数据的InputChannel。而可用的InputChannel则由它们自己通过回调SingleInputGate的onAvailableBuffer添加到阻塞队列inputChannelsWithData中来。当有可获取数据的InputChannel之后,即可获取到Buffer。

Flink除了提供了SingleInputGate这种常规的输入网关之外,还提供了UnionInputGate,它更像一个包含SingleInputGate的容器,同时可以这些SingleInputGate拥有的InputChannel联合起来。并且多数InputGate约定的接口方法的实现,都被委托给了每个SingleInputGate。

那么它在实现getNextBufferOrEvent方法的时候,到底从哪个InputGate来获得缓冲区呢。它采用的是事件通知机制,所有加入UnionInputGate的InputGate都会将自己注册到InputGateListener。当某个InputGate上有数据可获取,该InputGate将会被加入一个阻塞队列。接着我们再来看getNextBufferOrEvent方法的实现:

public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
    if (inputGatesWithRemainingData.isEmpty()) {
        return null;
    }

    //遍历每个InputGate,依次调用其requestPartitions方法
    requestPartitions();

    //阻塞等待输入网关队列中有可获取数据的输入网关
    final InputGate inputGate = inputGateListener.getNextInputGateToReadFrom();

    //从输入网关中获得数据
    final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();

    //如果获取到的是事件且该事件为EndOfPartitionEvent且输入网关已完成
    if (bufferOrEvent.isEvent()
        && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
        && inputGate.isFinished()) {
        //尝试将该输入网关从仍然可消费数据的输入网关集合中删除
        if (!inputGatesWithRemainingData.remove(inputGate)) {
            throw new IllegalStateException("Couldn't find input gate in set of remaining " +
                "input gates.");
        }
    }

    //获得通道索引偏移
    final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
    //计算真实通道索引
    bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());

    return bufferOrEvent;
}

基本上这个机制跟SingleInputGate是一致的,只不过在UnionInputGate中它是从InputGate中而非从InputChannel中罢了。

输入通道

一个InputGate包含多个输入通道(InputChannel),输入通道用于请求ResultSubpartitionView,并从中消费数据。

所谓的ResultSubpartitionView是由ResultSubpartition所创建的用于供消费者任务消费数据的视图对象。

对于每个InputChannel,消费的生命周期会经历如下的方法调用过程:

  1. requestSubpartition:请求ResultSubpartition;
  2. getNextBuffer:获得下一个Buffer;
  3. releaseAllResources:释放所有的相关资源;

InputChannel根据ResultPartitionLocation提供了三种实现:

  • LocalInputChannel:用于请求同实例中生产者任务所生产的ResultSubpartitionView的输入通道;
  • RemoteInputChannel:用于请求远程生产者任务所生产的ResultSubpartitionView的输入通道;
  • UnknownInputChannel:一种用于占位目的的输入通道,需要占位通道是因为暂未确定相对于生产者任务位置,但最终要么被替换为RemoteInputChannel,要么被替换为LocalInputChannel。

LocalInputChannel会从相同的JVM实例中消费生产者任务所生产的Buffer。因此,这种模式是直接借助于方法调用和对象共享的机制完成消费,无需跨节点网络通信。具体而言,它是通过ResultPartitionManager来直接创建对应的ResultSubpartitionView的实例,这种通道相对简单。

RemoteInputChannel是我们重点关注的输入通道,因为它涉及到远程请求结果子分区。远程数据交换的通信机制建立在Netty框架的基础之上,因此会有一个主交互对象PartitionRequestClient来衔接通信层跟输入通道。

我们以请求子分区的requestSubpartition为入口来进行分析。首先,通过一个ConnectionManager根据连接编号(对应着目的主机)来创建PartitionRequestClient实例。接着具体的请求工作被委托给PartitionRequestClient的实例:

partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);

因为Netty以异步的方式处理请求。因此,上面的代码段中会看到将代表当前RemoteChannel实例的this对象作为参数注入到Netty的特定的ChannelHandler中去,在处理时根据特定的处理逻辑会触发RemoteChannel中相应的回调方法。

在RemoteChannel中定义了多个“onXXX”回调方法来衔接Netty的事件回调。其中,较为关键的自然是接收到数据的onBuffer方法:

public void onBuffer(Buffer buffer, int sequenceNumber) {
    boolean success = false;

    try {
        synchronized (receivedBuffers) {
            if (!isReleased.get()) {
                //如果实际的序列号跟所期待的序列号相等
                if (expectedSequenceNumber == sequenceNumber) {
                    //将数据加入接收队列同时将预期序列号计数器加一
                    receivedBuffers.add(buffer);
                    expectedSequenceNumber++;

                    //发出有可用Buffer的通知,该通知随后会被传递给其所归属的SingleInputGate,
                    //以通知其订阅者,有可用数据
                    notifyAvailableBuffer();

                    success = true;
                }
                else {
                    //如果实际序列号跟所期待的序列号不一致,则会触发onError回调,并相应以一个特定的异常对象
                    //该方法调用在成功设置完错误原因后,同样会触发notifyAvailableBuffer方法调用
                    onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                }
            }
        }
    }
    finally {
        //如果不成功,则该Buffer会被回收
        if (!success) {
            buffer.recycle();
        }
    }
}

从代码段可以看出,消费时首先会进行序列号比对,这可以看作是一种“校验”机制。服务端每响应客户端一个Buffer都会将序列号加一并随响应数据一起发回给客户端,而客户端则会在消费时也同时累加本地的序列号计数器。在消费的过程中,两个序列号必须一致才能保证消费的顺利进行,否则InputChannel将会抛出IOException异常。

onBuffer方法的执行处于Netty的I/O线程上,但RemoteInputChannel中getNextBuffer却不会在Netty的I/O线程上被调用,所以必须有一个数据共享的容器,这个容器就是receivedBuffers队列。getNextBuffer就是直接从receivedBuffers队列中出队一条数据然后返回。



原文发布时间为:2017-01-05

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

上一篇:H3C路由器上配置远程端口镜像(3种配置方式之1)


下一篇:跟我学交换机配置(四)