【Flink】Flink netty 通讯 PartitionRequestClient NettyPartitionRequestClient

【Flink】Flink netty 通讯 PartitionRequestClient NettyPartitionRequestClient

1.概述

转载:http://www.voidcn.com/article/p-tbmjvzhq-bkg.html

这篇文章不是很完善,这点我也不是很懂,以后补充

分区请求客户端(PartitionRequestClient)用于发起远程PartitionRequest请求,它也是RemoteChannel跟Netty通信层之间进行衔接的对象。

对单一的TaskManager而言只存在一个NettyClient实例。但处于同一TaskManager中不同的任务实例可能会跟不同的远程TaskManager上的任务之间交换数据,不同的TaskManager实例会有不同的ConnectionID(用于标识不同的IP地址)。因此,Flink采用PartitionRequestClient来对应ConnectionID,并提供了分区请求客户端工厂(PartitionRequestClientFactory)来创建PartitionRequestClient并保存ConnectionID与之的对应关系。

接下来,我们重点分析一下其请求ResultPartition的requestSubpartition方法:

/**
	 * Requests a remote intermediate result partition queue.
	 *
	 * <p>The request goes to the remote producer, for which this partition
	 * request client instance has been created.
	 *
	 * 请求一个远程中间结果分区队列。
	 *
	 * 请求被发送到远程生产者,这个分区请求客户端实例已经被创建。
	 */
	@Override
	public void requestSubpartition(
			final ResultPartitionID partitionId,
			final int subpartitionIndex,
			final RemoteInputChannel inputChannel,
			int delayMs) throws IOException {

		checkNotClosed();

		LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
				subpartitionIndex, partitionId, delayMs);

		//将当前请求数据的RemoteInputChannel的实例注入到NettyClient的ChannelHandler管道的
		//PartitionRequestClientHandler实例中
		clientHandler.addInputChannel(inputChannel);

		//构建PartitionRequest请求对象
		final PartitionRequest request = new PartitionRequest(
				partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());

		//构建一个ChannelFutureListener的实例,当I/O操作执行失败后,会触发相关的错误处理逻辑
		final ChannelFutureListener listener = new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture future) throws Exception {
				if (!future.isSuccess()) {
					clientHandler.removeInputChannel(inputChannel);
					SocketAddress remoteAddr = future.channel().remoteAddress();
					inputChannel.onError(
							new LocalTransportException(
								String.format("Sending the partition request to '%s' failed.", remoteAddr),
								future.channel().localAddress(), future.cause()
							));
				}
			}
		};

		//立即发送该请求,并注册listener
		if (delayMs == 0) {
			ChannelFuture f = tcpChannel.writeAndFlush(request);
			f.addListener(listener);
		} else {
			//如果请求需要延迟一定的时间,则延迟发送请求
			final ChannelFuture[] f = new ChannelFuture[1];
			tcpChannel.eventLoop().schedule(new Runnable() {
				@Override
				public void run() {
					f[0] = tcpChannel.writeAndFlush(request);
					f[0].addListener(listener);
				}
			}, delayMs, TimeUnit.MILLISECONDS);
		}
	}

上一篇:NSTimer、performSelector 函数没有被调用的原因


下一篇:NiiOS-iOS复习及在学习进度更新