Flink1.12源码解读—— Netty通讯模式

Apache Flink作为国内最火的大数据计算引擎之一,自身支持高吞吐,低延迟,exactly-once语义,有状态流等特性,阅读源码有助加深对框架的理解和认知。

在之前解析过的Flink物理执行图(一)和Checkpoint过程中我们能看到过程中涉及到了数据的存储和交互,并且在后面分析Task运行任务时也会涉及到频发的数据使用和交互,所以这章主要是解析下Flink自带的内存管理机制和基于Netty的通讯模式已便于加深对它的理解。

 

Flink底层通讯中各组件的通讯使用的Akka(可参考我前面的文章https://blog.csdn.net/ws0owws0ow/article/details/113991593),而像用户数据,Checkpoint事件等数据的交互底层是基于Netty作为通讯工具,在Flink内部Netty的使用场景主要是服务RemoteInputChannel和ResultSubpartition这2个角色。在Flink创建物理执行图时会生成最细粒度的Task任务以及负责bufferEvent传输生命周期的MailBox,MailBox会在开启条件下循环调用Input和output的数据交互,也就是ResultSubpartition发起数据可用通知下游的RemoteInputChannel消费队列中可用数据,整个过程也是基于Credit的通讯模式,在数据交互时会带上自身的可用Buffer个数(backlog)在达到阈值时会触发Flink的反压。过程中涉及到的内存管和反压机制会放在下一章节解析

 

我们从Netty的Client和Server角度开始分析,Flink处理核心接收数据逻辑主要写在继承了ChannelInboundHandler的CreditBasedPartitionRequestClientHandler(Client)和PartitionRequestQueue(Server)

  • CreditBasedPartitionRequestClientHandler:主要负责接收PartitionRequestQueue的通知并消费其ResultSubpatitionView的Deque中的Buffer或Event
  • PartitionRequestQueue:当满足有可用数据时,通知下游inputChannel通过CreditBasedPartitionRequestClientHandler消费发送过来的Buffer或Event
  • PartitionRequestServerHandler:主要负责处理接收Client端的数据,比如PartitionRequest的申请需求

 

先从Server端的构建过程开始分析:

在初始化TaskMananger实例时(可参考我前面的文章https://blog.csdn.net/ws0owws0ow/article/details/114368079),会创建一系列TMServers,其中就包括NettyShuffleEnvironment,NettyShuffleEnvironment会初始化ServerChannelInitializer并生成包含PartitionRequestQueue的serverHandler

 

public static TaskManagerServices fromConfiguration(
			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
			PermanentBlobService permanentBlobService,
			MetricGroup taskManagerMetricGroup,
			ExecutorService ioExecutor,
			FatalErrorHandler fatalErrorHandler) throws Exception {

		// pre-start checks
		checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

		final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

		// start the I/O manager, it will create some temp directories.
		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

		//创建NettyShuffleEnvironment主要的三个核心组件
		//networkBufferPool
		//resultPartitionFactory
		//inputGateFactory
		final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
			taskManagerServicesConfiguration,
			taskEventDispatcher,
			taskManagerMetricGroup,
			ioExecutor);

		//生成Netty server端的ChannelHandler
		final int listeningDataPort = shuffleEnvironment.start();


		.....
	}

int init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
		return init(
			nettyBufferPool,
			//初始化生成server端的ChannelHandler
			sslHandlerFactory -> new ServerChannelInitializer(protocol, sslHandlerFactory));
	}


static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
		private final NettyProtocol protocol;
		private final SSLHandlerFactory sslHandlerFactory;

		public ServerChannelInitializer(
			NettyProtocol protocol, SSLHandlerFactory sslHandlerFactory) {
			this.protocol = protocol;
			this.sslHandlerFactory = sslHandlerFactory;
		}

		@Override//框架内部调用
		public void initChannel(SocketChannel channel) throws Exception {
			if (sslHandlerFactory != null) {
				channel.pipeline().addLast("ssl",
					sslHandlerFactory.createNettySSLHandler(channel.alloc()));
			}

			//生成ChannelHandler并添加到Pipeline中
			channel.pipeline().addLast(protocol.getServerChannelHandlers());
		}
	}

public ChannelHandler[] getServerChannelHandlers() {
		//ChannelInboundHandlerAdapter,主要负责向Client端发送BufferResponse数据 通知其消费
		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
		//SimpleChannelInboundHandler, 主要负责接收Client端数据,比如注册subpartition事件
		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
			partitionProvider,
			taskEventPublisher,
			queueOfPartitionQueues);

		//封装到Netty的ChannelHandler
		return new ChannelHandler[] {
			messageEncoder,
			new NettyMessage.NettyMessageDecoder(),
			serverHandler,
			queueOfPartitionQueues
		};
	}

这里重点说下PartitionRequestQueue的通知下游消费数据机制,当有可用数据时,比如之前讲过的Checkpoint事件()或者用户数据需要从当前operator发送至下游的operator时生成的Buffer 都会调用notifyReaderNonEmpty触发Channel的writeAndFlush往下游写入数据

 

//触发当前PartitionRequestQueue的 UserEventTriggered 方法
	void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
		// The notification might come from the same thread. For the initial writes this
		// might happen before the reader has set its reference to the view, because
		// creating the queue and the initial notification happen in the same method call.
		// This can be resolved by separating the creation of the view and allowing
		// notifications.

		// TODO This could potentially have a bad performance impact as in the
		// worst case (network consumes faster than the producer) each buffer
		// will trigger a separate event loop task being scheduled.
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
	}

上面的NetworkSequenceViewReader对应一个Subpartition,它是在Subpartition注册到NettyServer时生成(后面会讲),这里的fireUserEventTriggered是Netty内部接口,它会触发当前ChannelInboundHandler(PartitionRequestQueue)的userEventTriggered

 

public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
		// The user event triggered event loop callback is used for thread-safe
		// hand over of reader queues and cancelled producers.

		if (msg instanceof NetworkSequenceViewReader) {
			enqueueAvailableReader((NetworkSequenceViewReader) msg);
		} else if (msg.getClass() == InputChannelID.class) {
			....
	}

private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
		if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
			return;
		}
		// Queue an available reader for consumption. If the queue is empty,
		// we try trigger the actual write. Otherwise this will be handled by
		// the writeAndFlushNextMessageIfPossible calls.

		//存放的是可往下游写的NetworkSequenceViewReader
		//调用下方registerAvailableReader时会把当前reader添加进去
		//当后面往下游写数据时会从availableReaders.poll出来
		boolean triggerWrite = availableReaders.isEmpty();
		//添加reader进 availableReaders
		registerAvailableReader(reader);

		//如果为空属于可触发写条件
		if (triggerWrite) {
			//向下游写数据
			writeAndFlushNextMessageIfPossible(ctx.channel());
		}
	}

Poll出刚刚add进availableReaders的NetworkSequenceViewReader ->从NetworkSequenceViewReader的Deque中获取优先级最高的带上Backlog的BufferAndBacklog并封装成BufferResponse发送至Client端,Client会调用channelRead接受处理该buffer

 

private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
		if (fatalError || !channel.isWritable()) {
			return;
		}

		// The logic here is very similar to the combined input gate and local
		// input channel logic. You can think of this class acting as the input
		// gate and the consumed views as the local input channels.

		BufferAndAvailability next = null;
		try {
			while (true) {
				//Poll出刚刚add进availableReaders的NetworkSequenceViewReader
				NetworkSequenceViewReader reader = pollAvailableReader();

				// No queue with available data. We allow this here, because
				// of the write callbacks that are executed after each write.
				if (reader == null) {
					return;
				}

				//从NetworkSequenceViewReader中poll出buffer
				next = reader.getNextBuffer();

				if (next == null) {
					if (!reader.isReleased()) {
						continue;
					}

					Throwable cause = reader.getFailureCause();
					if (cause != null) {
						ErrorResponse msg = new ErrorResponse(
							new ProducerFailedException(cause),
							reader.getReceiverId());

						ctx.writeAndFlush(msg);
					}
				} else {
					// This channel was now removed from the available reader queue.
					// We re-add it into the queue if it is still available
					if (next.moreAvailable()) {
						registerAvailableReader(reader);
					}

                    //生成BufferResponse写入实例
					BufferResponse msg = new BufferResponse(
						next.buffer(),
						next.getSequenceNumber(),
						reader.getReceiverId(),
						next.buffersInBacklog());

					// Write and flush and wait until this is done before
					// trying to continue with the next buffer.
					
					//向channel中写入BufferResponse并添加一个future回调接口
					//回调接口根据返会的future类型做出对应动作
					channel.writeAndFlush(msg).addListener(writeListener);

					return;
				}
			}
		} catch (Throwable t) {
			if (next != null) {
				next.buffer().recycleBuffer();
			}

			throw new IOException(t.getMessage(), t);
		}
	}

这里我们主要看看Buffer的出列过程

用户数据和Checkpoint等事件都是以BufferConsumerWithPartialRecordLength对象存储在ResultSubpartition的双端队列中

  • 在inputChannel获取到数据后就会emit到自己的ResultSubpartition的Deque中用于下游消费
  • 在需要发送给下游消费时,ResultSubpartition就会从Deque中获取第一个BufferConsumerWithPartialRecordLength并转换成buffer写入下游

 

public BufferAndBacklog getNextBuffer() {
		return parent.pollBuffer();
	}

BufferAndBacklog pollBuffer() {
		//buffers的数据结构PrioritizedDeque<BufferConsumerWithPartialRecordLength>
		//用来存储未被消费的buffer或Event
		synchronized (buffers) {
			if (isBlockedByCheckpoint) {
				return null;
			}

			Buffer buffer = null;

			if (buffers.isEmpty()) {
				flushRequested = false;
			}

			while (!buffers.isEmpty()) {
				//获取deque中优先级最高的BufferConsumer,但不会Remove
				BufferConsumer bufferConsumer = buffers.peek().getBufferConsumer();
				//生成buffer
				buffer = bufferConsumer.build();

				checkState(bufferConsumer.isFinished() || buffers.size() == 1,
					"When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");

				if (buffers.size() == 1) {
					// turn off flushRequested flag if we drained all of the available data
					flushRequested = false;
				}

				//如果获取buffer完成就通过poll从buffers中remove掉该Buffer
				if (bufferConsumer.isFinished()) {
					buffers.poll().getBufferConsumer().close();
					//buffersInBacklog - 1
					decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
				}

				//当生成完buffer时,netty中的可读字节为0
				if (buffer.readableBytes() > 0) {
					//正常情况下为0 跳出循环
					break;
				}

				//从Netty的Channel中释放该buffer资源
				buffer.recycleBuffer();
				buffer = null;
				if (!bufferConsumer.isFinished()) {
					break;
				}
			}

			if (buffer == null) {
				return null;
			}

			if (buffer.getDataType().isBlockingUpstream()) {
				isBlockedByCheckpoint = true;
			}

			//buffersInBacklog - 1
			updateStatistics(buffer);
			// Do not report last remaining buffer on buffers as available to read (assuming it's unfinished).
			// It will be reported for reading either on flush or when the number of buffers in the queue
			// will be 2 or more.
			return new BufferAndBacklog(
				buffer,
				//buffersInBacklog 当前未被消费掉还囤积在当前ResultSubpartition的个数
				getBuffersInBacklog(),
				isDataAvailableUnsafe() ? getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
				sequenceNumber++);
		}
	}

Flink的数据交互时以Credit的机制交互的,所以在发送数据时会加上当前Deque中的剩余buffer个数,也就是buffersInBacklog,而Flink的反压机制也是基于该buffersInBacklog,如果囤积过多的backlog时就会触发反压

 

//包括checkpoint的事件也会放进来,老版本好像是ArrayDeque,并不支持事件优先级
	private final PrioritizedDeque<BufferConsumerWithPartialRecordLength> buffers = new PrioritizedDeque<>();

//记录当前buffers中剩余buffer个数
	//当add进去的时候会++
	//当poll出来时会--
	//下游会在获取buffer时查看该buffersInBacklog,如果达到阈值会触发反压
	private int buffersInBacklog;

以上是整个server端的ChannelHandler构建以及写流程,在InputChannel接收数据之前我们先分析下Client端的ChannelHandler构建过程。

在启动TaskRunnable线程之前,Flink会调用beforeInovke初始化所有的InputChannel并且根据subpartitionIndex建立InputChannel与ResultSubpartition的Channel连接,然后InputChannel向上游的ResultSubpartition发送注册subpartition事件

 

public void requestPartitions() {
		synchronized (requestLock) {
			if (!requestedPartitionsFlag) {
				if (closeFuture.isDone()) {
					throw new IllegalStateException("Already released.");
				}

				// Sanity checks
				if (numberOfInputChannels != inputChannels.size()) {
					throw new IllegalStateException(String.format(
						"Bug in input gate setup logic: mismatch between " +
						"number of total input channels [%s] and the currently set number of input " +
						"channels [%s].",
						inputChannels.size(),
						numberOfInputChannels));
				}

				//初始化inputChannel,本地数据传输为LocalInputChannel,远程为RemoteInputChannel.
				convertRecoveredInputChannels();
				//比如RemoteInputChannel,遍历每个inputChannel,连接远程分区Channel后创建PartitionRequestClient客户端
				//通过PartitionRequestClient向server端申请subpartition注册
				internalRequestPartitions();
			}

			requestedPartitionsFlag = true;
		}
	}

private void internalRequestPartitions() {
		for (InputChannel inputChannel : inputChannels.values()) {
			try {
				//inputChannel向指定consumedSubpartitionIndex(subpartitionIndex)注册requestSubpartition
				inputChannel.requestSubpartition(consumedSubpartitionIndex);
			} catch (Throwable t) {
				inputChannel.setError(t);
				return;
			}
		}
	}

public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
		if (partitionRequestClient == null) {
			// Create a client and request the partition
			try {//连接指定远程分区(connectionId) ,创建远程管道的Handler
				partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
			} catch (IOException e) {
				// IOExceptions indicate that we could not open a connection to the remote TaskExecutor
				throw new PartitionConnectionException(partitionId, e);
			}

			//InputChannel向上游的ResultSubpartition发送注册subpartition事件
			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
		}
	}

上面的requestSubpartition主要分为两步:

  • 连接指定connectionId(远程PartitionID)获取Netty连接后的Channel并且创建Client端的ChannelHandler(CreditBasedPartitionRequestClientHandler)
  • 生成PartitionRequest,通过Channel向上游ResultSubpartition(Server端)写入requestSubpartition事件

 

private NettyPartitionRequestClient connect(ConnectionID connectionId) throws RemoteTransportException, InterruptedException {
		try {//连接指定远程分区(connectionId)获取Netty连接后的Channel
			Channel channel = nettyClient.connect(connectionId.getAddress()).await().channel();
			NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
			return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this);
		} catch (InterruptedException e) {
			throw e;
		} catch (Exception e) {
			throw new RemoteTransportException(
				"Connecting to remote task manager '" + connectionId.getAddress() +
					"' has failed. This might indicate that the remote task " +
					"manager has been lost.",
				connectionId.getAddress(), e);
		}
	}

public ChannelHandler[] getClientChannelHandlers() {
		//主要负责处理接收远程发过来的数据
		NetworkClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler();

		return new ChannelHandler[]{
			messageEncoder,
			new NettyMessageClientDecoderDelegate(networkClientHandler),
			networkClientHandler};
	}


public void requestSubpartition(
		final ResultPartitionID partitionId,
		final int subpartitionIndex,
		final RemoteInputChannel inputChannel,
		int delayMs) throws IOException {

		checkNotClosed();

		LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
			subpartitionIndex, partitionId, delayMs);

		//注册到 CreditBasedPartitionRequestClientHandler(Netty客户端)的ConcurrentMap<InputChannelID, RemoteInputChannel> map中
		//当有消息来的时候 CreditBasedPartitionRequestClientHandler调用channelRead接口从Map中get出来消费
		clientHandler.addInputChannel(inputChannel);

		//创建一个PartitionRequest的NettyMessage,主要用来发送给server端注册自己
		final PartitionRequest request = new PartitionRequest(
			partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());

		//向申请的server端添加一个listener,在完成远程执行逻辑后通过listener通知机制回调接口
		final ChannelFutureListener listener = new ChannelFutureListener() {
			@Override//监听成功之后触发的接口
			public void operationComplete(ChannelFuture future) throws Exception {
				//如果失败移除当前inputChannel
				if (!future.isSuccess()) {
					clientHandler.removeInputChannel(inputChannel);
					SocketAddress remoteAddr = future.channel().remoteAddress();
					inputChannel.onError(
						new LocalTransportException(
							String.format("Sending the partition request to '%s' failed.", remoteAddr),
							future.channel().localAddress(), future.cause()
						));
				}
			}
		};

		//向之前建立的远程分区的NettyChannel发送PartitionRequest
		//server端调用channelRead0并生成CreditBasedSequenceNumberingViewReader
		if (delayMs == 0) {
			ChannelFuture f = tcpChannel.writeAndFlush(request);
			f.addListener(listener);
		} else {
			final ChannelFuture[] f = new ChannelFuture[1];
			tcpChannel.eventLoop().schedule(new Runnable() {
				@Override
				public void run() {
					f[0] = tcpChannel.writeAndFlush(request);
					f[0].addListener(listener);
				}
			}, delayMs, TimeUnit.MILLISECONDS);
		}
	}

上游ResultSubpartition的接收到下游的InputChannel的requestSubpartition申请后创建CreditBasedSequenceNumberingViewReader,而这个对象就是开篇讲过的触发下游消费通知中Flink调用的Reader对象

void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) 

 

protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
		try {
			Class<?> msgClazz = msg.getClass();

			// ----------------------------------------------------------------
			// Intermediate result partition requests
			// ----------------------------------------------------------------

			//Subpartition发送过来的注册请求
			if (msgClazz == PartitionRequest.class) {
				PartitionRequest request = (PartitionRequest) msg;

				LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);

				try {
					NetworkSequenceViewReader reader;
					reader = new CreditBasedSequenceNumberingViewReader(
						request.receiverId,
						request.credit,
						outboundQueue);

					reader.requestSubpartitionView(
						partitionProvider,
						request.partitionId,
						request.queueIndex);

					//注册到outboundQueue中
					outboundQueue.notifyReaderCreated(reader);
				} catch (PartitionNotFoundException notFound) {
					respondWithError(ctx, notFound, request.receiverId);
				}
			}
			// ----------------------------------------------------------------
			// Task events
			// ----------------------------------------------------------------
			else if (msgClazz == TaskEventRequest.class) {
				.......
	}

构建完Client端所有Netty角色后,我们回到之前解析的writeAndFlushNextMessageIfPossible方法(ResultSubpartition向InputChannel发送消费数据),这时会触发InputChannel中CreditBasedPartitionRequestClientHandler的channelRead方法 开始接收并处理Buffer

 

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try {
			decodeMsg(msg);
		} catch (Throwable t) {
			notifyAllChannelsOfErrorAndClose(t);
		}
	}

//消息类型主要是两种:
	//BufferResponse 和 ErrorResponse
	private void decodeMsg(Object msg) throws Throwable {
		final Class<?> msgClazz = msg.getClass();

		// ---- Buffer --------------------------------------------------------
		if (msgClazz == NettyMessage.BufferResponse.class) {
			NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
			
			RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
			if (inputChannel == null || inputChannel.isReleased()) {
				bufferOrEvent.releaseBuffer();

				cancelRequestFor(bufferOrEvent.receiverId);

				return;
			}

			try {
				//正常情况下开始处理buffer
				decodeBufferOrEvent(inputChannel, bufferOrEvent);
			} catch (Throwable t) {
				inputChannel.onError(t);
			}


		} else if (msgClazz == NettyMessage.ErrorResponse.class) {
			// ---- Error ---------------------------------------------------------
			.....
	}


public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
		

            .....

			//PrioritizedDeque<SequenceBuffer> 存放的是待消费的Buffer的双端队列
			synchronized (receivedBuffers) {
				// Similar to notifyBufferAvailable(), make sure that we never add a buffer
				// after releaseAllResources() released all buffers from receivedBuffers
				// (see above for details).
				if (isReleased.get()) {
					return;
				}

				wasEmpty = receivedBuffers.isEmpty();

				//查看buffer时候标记了优先级
				if (buffer.getDataType().hasPriority()) {
					//如果是把该buffer加入receivedBuffers的队列头部
					receivedBuffers.addPriorityElement(new SequenceBuffer(buffer, sequenceNumber));
					if (channelStatePersister.checkForBarrier(buffer)) {
						// checkpoint was not yet started by task thread,
						// so remember the numbers of buffers to spill for the time when it will be started
						numBuffersOvertaken = receivedBuffers.getNumUnprioritizedElements();
					}
					firstPriorityEvent = receivedBuffers.getNumPriorityElements() == 1;
				} else {
					//如果是非优先级的就直接按序add进receivedBuffers
					receivedBuffers.add(new SequenceBuffer(buffer, sequenceNumber));
					channelStatePersister.maybePersist(buffer);
				}

				++expectedSequenceNumber;
			}
			recycleBuffer = false;

			if (firstPriorityEvent) {
				notifyPriorityEvent(sequenceNumber);
			}
			if (wasEmpty) {
				notifyChannelNonEmpty();
			}

			//如果上游ResultSubpartition有囤积的backlog
			if (backlog >= 0) {
				onSenderBacklog(backlog);
			}
		} finally {
			//最后释放掉nettyChannel中的buffer资源
			if (recycleBuffer) {
				buffer.recycleBuffer();
			}
		}
	}

至此,InputChannel端接受到了ResultSubpartition发送过来的buffer并且add进了receivedBuffers队列中,以上就是整个Flink基于Netty的通讯机制,而在后面当Task开始运行时,会通过MailBox循环调用上下游的数据交互,InputChannel会从该receivedBuffers中pool出buffer 并根据buffer/Event的类型做不同的处理,比如匹配到checkpoint事件操作,或者匹配到用户数据的buffer,调用用户算子(Map,filter等)在数据处理完后序列化并发送给当前operator的ResultSubpartition 然后重复上面的步骤 直至调用到sink。以上整个底层数据流转的过程我会放在后面的物理执行图(二)中解析。Thanks。

上一篇:python 导入的模块使用了相对路径,导致找不到文件错误


下一篇:code