1.概述
转载:http://www.voidcn.com/article/p-tbmjvzhq-bkg.html
这篇文章不是很完善,这点我也不是很懂,以后补充
分区请求客户端(PartitionRequestClient)用于发起远程PartitionRequest请求,它也是RemoteChannel跟Netty通信层之间进行衔接的对象。
对单一的TaskManager而言只存在一个NettyClient实例。但处于同一TaskManager中不同的任务实例可能会跟不同的远程TaskManager上的任务之间交换数据,不同的TaskManager实例会有不同的ConnectionID(用于标识不同的IP地址)。因此,Flink采用PartitionRequestClient来对应ConnectionID,并提供了分区请求客户端工厂(PartitionRequestClientFactory)来创建PartitionRequestClient并保存ConnectionID与之的对应关系。
接下来,我们重点分析一下其请求ResultPartition的requestSubpartition方法:
/**
* Requests a remote intermediate result partition queue.
*
* <p>The request goes to the remote producer, for which this partition
* request client instance has been created.
*
* 请求一个远程中间结果分区队列。
*
* 请求被发送到远程生产者,这个分区请求客户端实例已经被创建。
*/
@Override
public void requestSubpartition(
final ResultPartitionID partitionId,
final int subpartitionIndex,
final RemoteInputChannel inputChannel,
int delayMs) throws IOException {
checkNotClosed();
LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
subpartitionIndex, partitionId, delayMs);
//将当前请求数据的RemoteInputChannel的实例注入到NettyClient的ChannelHandler管道的
//PartitionRequestClientHandler实例中
clientHandler.addInputChannel(inputChannel);
//构建PartitionRequest请求对象
final PartitionRequest request = new PartitionRequest(
partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
//构建一个ChannelFutureListener的实例,当I/O操作执行失败后,会触发相关的错误处理逻辑
final ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
clientHandler.removeInputChannel(inputChannel);
SocketAddress remoteAddr = future.channel().remoteAddress();
inputChannel.onError(
new LocalTransportException(
String.format("Sending the partition request to '%s' failed.", remoteAddr),
future.channel().localAddress(), future.cause()
));
}
}
};
//立即发送该请求,并注册listener
if (delayMs == 0) {
ChannelFuture f = tcpChannel.writeAndFlush(request);
f.addListener(listener);
} else {
//如果请求需要延迟一定的时间,则延迟发送请求
final ChannelFuture[] f = new ChannelFuture[1];
tcpChannel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
f[0] = tcpChannel.writeAndFlush(request);
f[0].addListener(listener);
}
}, delayMs, TimeUnit.MILLISECONDS);
}
}