结果分区消费端
在前一篇,我们讲解了生产者分区,生产者分区是生产者任务生产中间结果数据的过程。消费者任务在获得结果分区可用的通知之后,会发起对数据的请求。我们仍然以生产者分区的例子作为假设,其在消费端示意图如下:
可以看到在生产端和消费端存在对等的模型,具体ResultSubpartition中的数据如何被消费,我们将在本篇进行深入剖析。
输入网关
输入网关(InputGate)用于消费中间结果(IntermediateResult)在并行执行时由子任务生产的一个或多个结果分区(ResultPartition)。
可以认为生产端的ResultPartition跟消费端的InputGate是对等的。
Flink当前提供了两个输入网关的实现,分别是:
- SingleInputGate:常规输入网关;
- UnionInputGate:联合输入网关,它允许将多个输入网关联合起来;
我们主要分析SingleInputGate,因为它是消费ResultPartition的实体,而UnionInputGate主要充当InputGate容器的角色。
SingleInputGate主要的初始化逻辑被封装在其静态的create方法中,当一个Task被实例化时在其构造器中会调用该create方法初始化它对应的SingleInputGate实例。create方法根据传递进来的InputGateDeploymentDescriptor完成对其包含的所有InputChannel的实例化。因为InputChannel记录是按照跟生产端任务的位置来分类的(我们会在下面进行具体分析),所以其实例化也是按照InputGateDeploymentDescriptor中每个InputChannelDeploymentDescriptor包含的ResultPartitionLocation属性来初始化的。
不同的ResultPartitionLocation,消费端任务其请求结果子分区的方式也不同,这一点我们在讲解生产者分区是有所说明。
作为数据的消费者,InputGate最关键的方法自然是获取生产者所生产的缓冲区,提供该功能的方法为getNextBufferOrEvent,它返回的对象是我们之前谈到的统一的数据交换对象BufferOrEvent。
BufferOrEvent的直接消费对象是通信层API中的记录读取器,它会将Buffer中的数据反序列化为记录供上层任务使用。
我们以getNextBufferOrEvent方法为主线来分析SingleInputGate类。
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
//如果已接收到所有EndOfPartitionEvent事件,则说明每个ResultSubpartition中的数据都被消费完成
if (hasReceivedAllEndOfPartitionEvents) {
return null;
}
//触发所有的输入通道向ResultSubpartition发起请求
requestPartitions();
InputChannel currentChannel = null;
//阻塞并循环等待有可获取数据的通道可用
while (currentChannel == null) {
if (isReleased) {
throw new IllegalStateException("Released");
}
//从阻塞队列中请求(并删除)队首的输入通道,阻塞两秒钟,如果没有获取到则不断请求,直到获取到一个输入通道位置
currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
}
//从输入通道中获得下一个Buffer
final Buffer buffer = currentChannel.getNextBuffer();
if (buffer == null) {
throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
"notified by channel about available data, but none was available.");
}
//如果该Buffer是用户数据,则构建BufferOrEvent对象并返回
if (buffer.isBuffer()) {
return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
}
//否则把它当作事件来处理
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
//如果获取到的是标识某ResultSubpartition已经生产完数据的事件
if (event.getClass() == EndOfPartitionEvent.class) {
//对获取该ResultSubpartition的通道进行标记
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
//如果所有信道都被标记了,置全部通道获取数据完成
if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
hasReceivedAllEndOfPartitionEvents = true;
}
//对外发出ResultSubpartition已被消费的通知同时释放资源
currentChannel.notifySubpartitionConsumed();
currentChannel.releaseAllResources();
}
//以事件来构建BufferOrEvent对象
return new BufferOrEvent(event, currentChannel.getChannelIndex());
}
}
以上代码段中,第一个关键调用是requestPartitions方法。它会触发所有InputChannel发起对requestSubpartition方法的调用以请求生产端的ResultSubpartition。
有一种占位目的的UnknowInputChannel不响应该方法,因为它最终会被确定为是LocalInputChannel还是RemoteInputChannel,确定的时机通常是JobManager通知器其可消费,TaskManager调用当前SingleInputGate的updateInputChannel方法,确定UnknowInputChannel会转变的具体的通道类型后再调用requestSubpartition方法。
由于requestPartitions只是起到触发其内部的InputChannel去请求的作用,这个调用可能并不会阻塞等待远程数据被返回。因为不同的InputChannel其请求的机制并不相同,RemoteChannel就是利用Netty异步请求的。所以SingleInputGate采用阻塞等待以及事件回调的方式来等待InputChannel上的数据可用。具体而言,它在while代码块中循环阻塞等待有可获取数据的InputChannel。而可用的InputChannel则由它们自己通过回调SingleInputGate的onAvailableBuffer添加到阻塞队列inputChannelsWithData中来。当有可获取数据的InputChannel之后,即可获取到Buffer。
Flink除了提供了SingleInputGate这种常规的输入网关之外,还提供了UnionInputGate,它更像一个包含SingleInputGate的容器,同时可以这些SingleInputGate拥有的InputChannel联合起来。并且多数InputGate约定的接口方法的实现,都被委托给了每个SingleInputGate。
那么它在实现getNextBufferOrEvent方法的时候,到底从哪个InputGate来获得缓冲区呢。它采用的是事件通知机制,所有加入UnionInputGate的InputGate都会将自己注册到InputGateListener。当某个InputGate上有数据可获取,该InputGate将会被加入一个阻塞队列。接着我们再来看getNextBufferOrEvent方法的实现:
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
if (inputGatesWithRemainingData.isEmpty()) {
return null;
}
//遍历每个InputGate,依次调用其requestPartitions方法
requestPartitions();
//阻塞等待输入网关队列中有可获取数据的输入网关
final InputGate inputGate = inputGateListener.getNextInputGateToReadFrom();
//从输入网关中获得数据
final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
//如果获取到的是事件且该事件为EndOfPartitionEvent且输入网关已完成
if (bufferOrEvent.isEvent()
&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
&& inputGate.isFinished()) {
//尝试将该输入网关从仍然可消费数据的输入网关集合中删除
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find input gate in set of remaining " +
"input gates.");
}
}
//获得通道索引偏移
final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
//计算真实通道索引
bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
return bufferOrEvent;
}
基本上这个机制跟SingleInputGate是一致的,只不过在UnionInputGate中它是从InputGate中而非从InputChannel中罢了。
输入通道
一个InputGate包含多个输入通道(InputChannel),输入通道用于请求ResultSubpartitionView,并从中消费数据。
所谓的ResultSubpartitionView是由ResultSubpartition所创建的用于供消费者任务消费数据的视图对象。
对于每个InputChannel,消费的生命周期会经历如下的方法调用过程:
- requestSubpartition:请求ResultSubpartition;
- getNextBuffer:获得下一个Buffer;
- releaseAllResources:释放所有的相关资源;
InputChannel根据ResultPartitionLocation提供了三种实现:
- LocalInputChannel:用于请求同实例中生产者任务所生产的ResultSubpartitionView的输入通道;
- RemoteInputChannel:用于请求远程生产者任务所生产的ResultSubpartitionView的输入通道;
- UnknownInputChannel:一种用于占位目的的输入通道,需要占位通道是因为暂未确定相对于生产者任务位置,但最终要么被替换为RemoteInputChannel,要么被替换为LocalInputChannel。
LocalInputChannel会从相同的JVM实例中消费生产者任务所生产的Buffer。因此,这种模式是直接借助于方法调用和对象共享的机制完成消费,无需跨节点网络通信。具体而言,它是通过ResultPartitionManager来直接创建对应的ResultSubpartitionView的实例,这种通道相对简单。
RemoteInputChannel是我们重点关注的输入通道,因为它涉及到远程请求结果子分区。远程数据交换的通信机制建立在Netty框架的基础之上,因此会有一个主交互对象PartitionRequestClient来衔接通信层跟输入通道。
我们以请求子分区的requestSubpartition为入口来进行分析。首先,通过一个ConnectionManager根据连接编号(对应着目的主机)来创建PartitionRequestClient实例。接着具体的请求工作被委托给PartitionRequestClient的实例:
partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
因为Netty以异步的方式处理请求。因此,上面的代码段中会看到将代表当前RemoteChannel实例的this对象作为参数注入到Netty的特定的ChannelHandler中去,在处理时根据特定的处理逻辑会触发RemoteChannel中相应的回调方法。
在RemoteChannel中定义了多个“onXXX”回调方法来衔接Netty的事件回调。其中,较为关键的自然是接收到数据的onBuffer方法:
public void onBuffer(Buffer buffer, int sequenceNumber) {
boolean success = false;
try {
synchronized (receivedBuffers) {
if (!isReleased.get()) {
//如果实际的序列号跟所期待的序列号相等
if (expectedSequenceNumber == sequenceNumber) {
//将数据加入接收队列同时将预期序列号计数器加一
receivedBuffers.add(buffer);
expectedSequenceNumber++;
//发出有可用Buffer的通知,该通知随后会被传递给其所归属的SingleInputGate,
//以通知其订阅者,有可用数据
notifyAvailableBuffer();
success = true;
}
else {
//如果实际序列号跟所期待的序列号不一致,则会触发onError回调,并相应以一个特定的异常对象
//该方法调用在成功设置完错误原因后,同样会触发notifyAvailableBuffer方法调用
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
}
}
}
}
finally {
//如果不成功,则该Buffer会被回收
if (!success) {
buffer.recycle();
}
}
}
从代码段可以看出,消费时首先会进行序列号比对,这可以看作是一种“校验”机制。服务端每响应客户端一个Buffer都会将序列号加一并随响应数据一起发回给客户端,而客户端则会在消费时也同时累加本地的序列号计数器。在消费的过程中,两个序列号必须一致才能保证消费的顺利进行,否则InputChannel将会抛出IOException异常。
onBuffer方法的执行处于Netty的I/O线程上,但RemoteInputChannel中getNextBuffer却不会在Netty的I/O线程上被调用,所以必须有一个数据共享的容器,这个容器就是receivedBuffers队列。getNextBuffer就是直接从receivedBuffers队列中出队一条数据然后返回。
原文发布时间为:2017-01-05
本文作者:vinoYang
本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。