Apache Flink作为国内最火的大数据计算引擎之一,自身支持高吞吐,低延迟,exactly-once语义,有状态流等特性,阅读源码有助加深对框架的理解和认知。
在之前解析过的Flink物理执行图(一)和Checkpoint过程中我们能看到过程中涉及到了数据的存储和交互,并且在后面分析Task运行任务时也会涉及到频发的数据使用和交互,所以这章主要是解析下Flink自带的内存管理机制和基于Netty的通讯模式已便于加深对它的理解。
Flink底层通讯中各组件的通讯使用的Akka(可参考我前面的文章https://blog.csdn.net/ws0owws0ow/article/details/113991593),而像用户数据,Checkpoint事件等数据的交互底层是基于Netty作为通讯工具,在Flink内部Netty的使用场景主要是服务RemoteInputChannel和ResultSubpartition这2个角色。在Flink创建物理执行图时会生成最细粒度的Task任务以及负责bufferEvent传输生命周期的MailBox,MailBox会在开启条件下循环调用Input和output的数据交互,也就是ResultSubpartition发起数据可用通知下游的RemoteInputChannel消费队列中可用数据,整个过程也是基于Credit的通讯模式,在数据交互时会带上自身的可用Buffer个数(backlog)在达到阈值时会触发Flink的反压。过程中涉及到的内存管和反压机制会放在下一章节解析
我们从Netty的Client和Server角度开始分析,Flink处理核心接收数据逻辑主要写在继承了ChannelInboundHandler的CreditBasedPartitionRequestClientHandler(Client)和PartitionRequestQueue(Server)
- CreditBasedPartitionRequestClientHandler:主要负责接收PartitionRequestQueue的通知并消费其ResultSubpatitionView的Deque中的Buffer或Event
- PartitionRequestQueue:当满足有可用数据时,通知下游inputChannel通过CreditBasedPartitionRequestClientHandler消费发送过来的Buffer或Event
- PartitionRequestServerHandler:主要负责处理接收Client端的数据,比如PartitionRequest的申请需求
先从Server端的构建过程开始分析:
在初始化TaskMananger实例时(可参考我前面的文章https://blog.csdn.net/ws0owws0ow/article/details/114368079),会创建一系列TMServers,其中就包括NettyShuffleEnvironment,NettyShuffleEnvironment会初始化ServerChannelInitializer并生成包含PartitionRequestQueue的serverHandler
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
PermanentBlobService permanentBlobService,
MetricGroup taskManagerMetricGroup,
ExecutorService ioExecutor,
FatalErrorHandler fatalErrorHandler) throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
//创建NettyShuffleEnvironment主要的三个核心组件
//networkBufferPool
//resultPartitionFactory
//inputGateFactory
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup,
ioExecutor);
//生成Netty server端的ChannelHandler
final int listeningDataPort = shuffleEnvironment.start();
.....
}
int init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
return init(
nettyBufferPool,
//初始化生成server端的ChannelHandler
sslHandlerFactory -> new ServerChannelInitializer(protocol, sslHandlerFactory));
}
static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
private final NettyProtocol protocol;
private final SSLHandlerFactory sslHandlerFactory;
public ServerChannelInitializer(
NettyProtocol protocol, SSLHandlerFactory sslHandlerFactory) {
this.protocol = protocol;
this.sslHandlerFactory = sslHandlerFactory;
}
@Override//框架内部调用
public void initChannel(SocketChannel channel) throws Exception {
if (sslHandlerFactory != null) {
channel.pipeline().addLast("ssl",
sslHandlerFactory.createNettySSLHandler(channel.alloc()));
}
//生成ChannelHandler并添加到Pipeline中
channel.pipeline().addLast(protocol.getServerChannelHandlers());
}
}
public ChannelHandler[] getServerChannelHandlers() {
//ChannelInboundHandlerAdapter,主要负责向Client端发送BufferResponse数据 通知其消费
PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
//SimpleChannelInboundHandler, 主要负责接收Client端数据,比如注册subpartition事件
PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
partitionProvider,
taskEventPublisher,
queueOfPartitionQueues);
//封装到Netty的ChannelHandler
return new ChannelHandler[] {
messageEncoder,
new NettyMessage.NettyMessageDecoder(),
serverHandler,
queueOfPartitionQueues
};
}
这里重点说下PartitionRequestQueue的通知下游消费数据机制,当有可用数据时,比如之前讲过的Checkpoint事件()或者用户数据需要从当前operator发送至下游的operator时生成的Buffer 都会调用notifyReaderNonEmpty触发Channel的writeAndFlush往下游写入数据
//触发当前PartitionRequestQueue的 UserEventTriggered 方法
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
// The notification might come from the same thread. For the initial writes this
// might happen before the reader has set its reference to the view, because
// creating the queue and the initial notification happen in the same method call.
// This can be resolved by separating the creation of the view and allowing
// notifications.
// TODO This could potentially have a bad performance impact as in the
// worst case (network consumes faster than the producer) each buffer
// will trigger a separate event loop task being scheduled.
ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
}
上面的NetworkSequenceViewReader对应一个Subpartition,它是在Subpartition注册到NettyServer时生成(后面会讲),这里的fireUserEventTriggered是Netty内部接口,它会触发当前ChannelInboundHandler(PartitionRequestQueue)的userEventTriggered
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
// The user event triggered event loop callback is used for thread-safe
// hand over of reader queues and cancelled producers.
if (msg instanceof NetworkSequenceViewReader) {
enqueueAvailableReader((NetworkSequenceViewReader) msg);
} else if (msg.getClass() == InputChannelID.class) {
....
}
private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return;
}
// Queue an available reader for consumption. If the queue is empty,
// we try trigger the actual write. Otherwise this will be handled by
// the writeAndFlushNextMessageIfPossible calls.
//存放的是可往下游写的NetworkSequenceViewReader
//调用下方registerAvailableReader时会把当前reader添加进去
//当后面往下游写数据时会从availableReaders.poll出来
boolean triggerWrite = availableReaders.isEmpty();
//添加reader进 availableReaders
registerAvailableReader(reader);
//如果为空属于可触发写条件
if (triggerWrite) {
//向下游写数据
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}
Poll出刚刚add进availableReaders的NetworkSequenceViewReader ->从NetworkSequenceViewReader的Deque中获取优先级最高的带上Backlog的BufferAndBacklog并封装成BufferResponse发送至Client端,Client会调用channelRead接受处理该buffer
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError || !channel.isWritable()) {
return;
}
// The logic here is very similar to the combined input gate and local
// input channel logic. You can think of this class acting as the input
// gate and the consumed views as the local input channels.
BufferAndAvailability next = null;
try {
while (true) {
//Poll出刚刚add进availableReaders的NetworkSequenceViewReader
NetworkSequenceViewReader reader = pollAvailableReader();
// No queue with available data. We allow this here, because
// of the write callbacks that are executed after each write.
if (reader == null) {
return;
}
//从NetworkSequenceViewReader中poll出buffer
next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg = new ErrorResponse(
new ProducerFailedException(cause),
reader.getReceiverId());
ctx.writeAndFlush(msg);
}
} else {
// This channel was now removed from the available reader queue.
// We re-add it into the queue if it is still available
if (next.moreAvailable()) {
registerAvailableReader(reader);
}
//生成BufferResponse写入实例
BufferResponse msg = new BufferResponse(
next.buffer(),
next.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
// Write and flush and wait until this is done before
// trying to continue with the next buffer.
//向channel中写入BufferResponse并添加一个future回调接口
//回调接口根据返会的future类型做出对应动作
channel.writeAndFlush(msg).addListener(writeListener);
return;
}
}
} catch (Throwable t) {
if (next != null) {
next.buffer().recycleBuffer();
}
throw new IOException(t.getMessage(), t);
}
}
这里我们主要看看Buffer的出列过程
用户数据和Checkpoint等事件都是以BufferConsumerWithPartialRecordLength对象存储在ResultSubpartition的双端队列中
- 在inputChannel获取到数据后就会emit到自己的ResultSubpartition的Deque中用于下游消费
- 在需要发送给下游消费时,ResultSubpartition就会从Deque中获取第一个BufferConsumerWithPartialRecordLength并转换成buffer写入下游
public BufferAndBacklog getNextBuffer() {
return parent.pollBuffer();
}
BufferAndBacklog pollBuffer() {
//buffers的数据结构PrioritizedDeque<BufferConsumerWithPartialRecordLength>
//用来存储未被消费的buffer或Event
synchronized (buffers) {
if (isBlockedByCheckpoint) {
return null;
}
Buffer buffer = null;
if (buffers.isEmpty()) {
flushRequested = false;
}
while (!buffers.isEmpty()) {
//获取deque中优先级最高的BufferConsumer,但不会Remove
BufferConsumer bufferConsumer = buffers.peek().getBufferConsumer();
//生成buffer
buffer = bufferConsumer.build();
checkState(bufferConsumer.isFinished() || buffers.size() == 1,
"When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");
if (buffers.size() == 1) {
// turn off flushRequested flag if we drained all of the available data
flushRequested = false;
}
//如果获取buffer完成就通过poll从buffers中remove掉该Buffer
if (bufferConsumer.isFinished()) {
buffers.poll().getBufferConsumer().close();
//buffersInBacklog - 1
decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
}
//当生成完buffer时,netty中的可读字节为0
if (buffer.readableBytes() > 0) {
//正常情况下为0 跳出循环
break;
}
//从Netty的Channel中释放该buffer资源
buffer.recycleBuffer();
buffer = null;
if (!bufferConsumer.isFinished()) {
break;
}
}
if (buffer == null) {
return null;
}
if (buffer.getDataType().isBlockingUpstream()) {
isBlockedByCheckpoint = true;
}
//buffersInBacklog - 1
updateStatistics(buffer);
// Do not report last remaining buffer on buffers as available to read (assuming it's unfinished).
// It will be reported for reading either on flush or when the number of buffers in the queue
// will be 2 or more.
return new BufferAndBacklog(
buffer,
//buffersInBacklog 当前未被消费掉还囤积在当前ResultSubpartition的个数
getBuffersInBacklog(),
isDataAvailableUnsafe() ? getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
sequenceNumber++);
}
}
Flink的数据交互时以Credit的机制交互的,所以在发送数据时会加上当前Deque中的剩余buffer个数,也就是buffersInBacklog,而Flink的反压机制也是基于该buffersInBacklog,如果囤积过多的backlog时就会触发反压
//包括checkpoint的事件也会放进来,老版本好像是ArrayDeque,并不支持事件优先级
private final PrioritizedDeque<BufferConsumerWithPartialRecordLength> buffers = new PrioritizedDeque<>();
//记录当前buffers中剩余buffer个数
//当add进去的时候会++
//当poll出来时会--
//下游会在获取buffer时查看该buffersInBacklog,如果达到阈值会触发反压
private int buffersInBacklog;
以上是整个server端的ChannelHandler构建以及写流程,在InputChannel接收数据之前我们先分析下Client端的ChannelHandler构建过程。
在启动TaskRunnable线程之前,Flink会调用beforeInovke初始化所有的InputChannel并且根据subpartitionIndex建立InputChannel与ResultSubpartition的Channel连接,然后InputChannel向上游的ResultSubpartition发送注册subpartition事件
public void requestPartitions() {
synchronized (requestLock) {
if (!requestedPartitionsFlag) {
if (closeFuture.isDone()) {
throw new IllegalStateException("Already released.");
}
// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException(String.format(
"Bug in input gate setup logic: mismatch between " +
"number of total input channels [%s] and the currently set number of input " +
"channels [%s].",
inputChannels.size(),
numberOfInputChannels));
}
//初始化inputChannel,本地数据传输为LocalInputChannel,远程为RemoteInputChannel.
convertRecoveredInputChannels();
//比如RemoteInputChannel,遍历每个inputChannel,连接远程分区Channel后创建PartitionRequestClient客户端
//通过PartitionRequestClient向server端申请subpartition注册
internalRequestPartitions();
}
requestedPartitionsFlag = true;
}
}
private void internalRequestPartitions() {
for (InputChannel inputChannel : inputChannels.values()) {
try {
//inputChannel向指定consumedSubpartitionIndex(subpartitionIndex)注册requestSubpartition
inputChannel.requestSubpartition(consumedSubpartitionIndex);
} catch (Throwable t) {
inputChannel.setError(t);
return;
}
}
}
public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
if (partitionRequestClient == null) {
// Create a client and request the partition
try {//连接指定远程分区(connectionId) ,创建远程管道的Handler
partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
} catch (IOException e) {
// IOExceptions indicate that we could not open a connection to the remote TaskExecutor
throw new PartitionConnectionException(partitionId, e);
}
//InputChannel向上游的ResultSubpartition发送注册subpartition事件
partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
}
}
上面的requestSubpartition主要分为两步:
- 连接指定connectionId(远程PartitionID)获取Netty连接后的Channel并且创建Client端的ChannelHandler(CreditBasedPartitionRequestClientHandler)
- 生成PartitionRequest,通过Channel向上游ResultSubpartition(Server端)写入requestSubpartition事件
private NettyPartitionRequestClient connect(ConnectionID connectionId) throws RemoteTransportException, InterruptedException {
try {//连接指定远程分区(connectionId)获取Netty连接后的Channel
Channel channel = nettyClient.connect(connectionId.getAddress()).await().channel();
NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this);
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
throw new RemoteTransportException(
"Connecting to remote task manager '" + connectionId.getAddress() +
"' has failed. This might indicate that the remote task " +
"manager has been lost.",
connectionId.getAddress(), e);
}
}
public ChannelHandler[] getClientChannelHandlers() {
//主要负责处理接收远程发过来的数据
NetworkClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler();
return new ChannelHandler[]{
messageEncoder,
new NettyMessageClientDecoderDelegate(networkClientHandler),
networkClientHandler};
}
public void requestSubpartition(
final ResultPartitionID partitionId,
final int subpartitionIndex,
final RemoteInputChannel inputChannel,
int delayMs) throws IOException {
checkNotClosed();
LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
subpartitionIndex, partitionId, delayMs);
//注册到 CreditBasedPartitionRequestClientHandler(Netty客户端)的ConcurrentMap<InputChannelID, RemoteInputChannel> map中
//当有消息来的时候 CreditBasedPartitionRequestClientHandler调用channelRead接口从Map中get出来消费
clientHandler.addInputChannel(inputChannel);
//创建一个PartitionRequest的NettyMessage,主要用来发送给server端注册自己
final PartitionRequest request = new PartitionRequest(
partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
//向申请的server端添加一个listener,在完成远程执行逻辑后通过listener通知机制回调接口
final ChannelFutureListener listener = new ChannelFutureListener() {
@Override//监听成功之后触发的接口
public void operationComplete(ChannelFuture future) throws Exception {
//如果失败移除当前inputChannel
if (!future.isSuccess()) {
clientHandler.removeInputChannel(inputChannel);
SocketAddress remoteAddr = future.channel().remoteAddress();
inputChannel.onError(
new LocalTransportException(
String.format("Sending the partition request to '%s' failed.", remoteAddr),
future.channel().localAddress(), future.cause()
));
}
}
};
//向之前建立的远程分区的NettyChannel发送PartitionRequest
//server端调用channelRead0并生成CreditBasedSequenceNumberingViewReader
if (delayMs == 0) {
ChannelFuture f = tcpChannel.writeAndFlush(request);
f.addListener(listener);
} else {
final ChannelFuture[] f = new ChannelFuture[1];
tcpChannel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
f[0] = tcpChannel.writeAndFlush(request);
f[0].addListener(listener);
}
}, delayMs, TimeUnit.MILLISECONDS);
}
}
上游ResultSubpartition的接收到下游的InputChannel的requestSubpartition申请后创建CreditBasedSequenceNumberingViewReader,而这个对象就是开篇讲过的触发下游消费通知中Flink调用的Reader对象
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader)
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
try {
Class<?> msgClazz = msg.getClass();
// ----------------------------------------------------------------
// Intermediate result partition requests
// ----------------------------------------------------------------
//Subpartition发送过来的注册请求
if (msgClazz == PartitionRequest.class) {
PartitionRequest request = (PartitionRequest) msg;
LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);
try {
NetworkSequenceViewReader reader;
reader = new CreditBasedSequenceNumberingViewReader(
request.receiverId,
request.credit,
outboundQueue);
reader.requestSubpartitionView(
partitionProvider,
request.partitionId,
request.queueIndex);
//注册到outboundQueue中
outboundQueue.notifyReaderCreated(reader);
} catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
}
// ----------------------------------------------------------------
// Task events
// ----------------------------------------------------------------
else if (msgClazz == TaskEventRequest.class) {
.......
}
构建完Client端所有Netty角色后,我们回到之前解析的writeAndFlushNextMessageIfPossible方法(ResultSubpartition向InputChannel发送消费数据),这时会触发InputChannel中CreditBasedPartitionRequestClientHandler的channelRead方法 开始接收并处理Buffer
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
decodeMsg(msg);
} catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
//消息类型主要是两种:
//BufferResponse 和 ErrorResponse
private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();
// ---- Buffer --------------------------------------------------------
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null || inputChannel.isReleased()) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return;
}
try {
//正常情况下开始处理buffer
decodeBufferOrEvent(inputChannel, bufferOrEvent);
} catch (Throwable t) {
inputChannel.onError(t);
}
} else if (msgClazz == NettyMessage.ErrorResponse.class) {
// ---- Error ---------------------------------------------------------
.....
}
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
.....
//PrioritizedDeque<SequenceBuffer> 存放的是待消费的Buffer的双端队列
synchronized (receivedBuffers) {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers from receivedBuffers
// (see above for details).
if (isReleased.get()) {
return;
}
wasEmpty = receivedBuffers.isEmpty();
//查看buffer时候标记了优先级
if (buffer.getDataType().hasPriority()) {
//如果是把该buffer加入receivedBuffers的队列头部
receivedBuffers.addPriorityElement(new SequenceBuffer(buffer, sequenceNumber));
if (channelStatePersister.checkForBarrier(buffer)) {
// checkpoint was not yet started by task thread,
// so remember the numbers of buffers to spill for the time when it will be started
numBuffersOvertaken = receivedBuffers.getNumUnprioritizedElements();
}
firstPriorityEvent = receivedBuffers.getNumPriorityElements() == 1;
} else {
//如果是非优先级的就直接按序add进receivedBuffers
receivedBuffers.add(new SequenceBuffer(buffer, sequenceNumber));
channelStatePersister.maybePersist(buffer);
}
++expectedSequenceNumber;
}
recycleBuffer = false;
if (firstPriorityEvent) {
notifyPriorityEvent(sequenceNumber);
}
if (wasEmpty) {
notifyChannelNonEmpty();
}
//如果上游ResultSubpartition有囤积的backlog
if (backlog >= 0) {
onSenderBacklog(backlog);
}
} finally {
//最后释放掉nettyChannel中的buffer资源
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
至此,InputChannel端接受到了ResultSubpartition发送过来的buffer并且add进了receivedBuffers队列中,以上就是整个Flink基于Netty的通讯机制,而在后面当Task开始运行时,会通过MailBox循环调用上下游的数据交互,InputChannel会从该receivedBuffers中pool出buffer 并根据buffer/Event的类型做不同的处理,比如匹配到checkpoint事件操作,或者匹配到用户数据的buffer,调用用户算子(Map,filter等)在数据处理完后序列化并发送给当前operator的ResultSubpartition 然后重复上面的步骤 直至调用到sink。以上整个底层数据流转的过程我会放在后面的物理执行图(二)中解析。Thanks。