初始化
Task
List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates(); // Consumed intermediate result partitions
this.inputGates = new SingleInputGate[consumedPartitions.size()];
this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>(); for (int i = 0; i < this.inputGates.length; i++) {
SingleInputGate gate = SingleInputGate.create(
taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment,
metricGroup.getIOMetricGroup()); this.inputGates[i] = gate;
inputGatesById.put(gate.getConsumedResultId(), gate);
}
初始化networkEnvironment
network.registerTask(this);
NetworkEnvironment
registerTask
// Setup the buffer pool for each buffer reader
final SingleInputGate[] inputGates = task.getAllInputGates(); for (SingleInputGate gate : inputGates) {
BufferPool bufferPool = null; try {
bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
gate.setBufferPool(bufferPool);
}
SingleInputGate
create
/**
* Creates an input gate and all of its input channels.
*/
public static SingleInputGate create(
String owningTaskName,
JobID jobId,
ExecutionAttemptID executionId,
InputGateDeploymentDescriptor igdd,
NetworkEnvironment networkEnvironment,
IOMetricGroup metrics) { final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
checkArgument(consumedSubpartitionIndex >= 0); final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); final SingleInputGate inputGate = new SingleInputGate( //生成SingleInputGate对象
owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
icdd.length, networkEnvironment.getPartitionStateChecker(), metrics); // Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length]; //初始化InputChannel for (int i = 0; i < inputChannels.length; i++) { final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation(); if (partitionLocation.isLocal()) { //local
inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
networkEnvironment.getPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
}
else if (partitionLocation.isRemote()) { //remote
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
partitionLocation.getConnectionId(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
}
else if (partitionLocation.isUnknown()) {
inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
networkEnvironment.getPartitionManager(),
networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
}
else {
throw new IllegalStateException("Unexpected partition location.");
} inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]); //将inputChannel设置inputGate
} return inputGate;
}
inputGate的inputChannel,对应于resultPartition的resultSubPartition
------------------------------------------------------------------------------------------------------
OneInputStreamTask
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
getCheckpointBarrierListener(),
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
isSerializingTimestamps());
StreamInputProcessor
InputGate inputGate = InputGateUtil.createInputGate(inputGates); if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
}
else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler = new BarrierTracker(inputGate);
}
StreamInputProcessor.processInput中
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel]; //SpillingAdaptiveSpanningRecordDeserializer
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); //将buffer set到SpillingAdaptiveSpanningRecordDeserializer
} //后续可以从set到SpillingAdaptiveSpanningRecordDeserializer中反序列化出record
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isFullRecord()) {
StreamElement recordOrWatermark = deserializationDelegate.getInstance();
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked()
BarrierBuffer
public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
BufferOrEvent next;
if (currentBuffered == null) { //如果currentBuffered为空,说明没有unblock的buffer数据,直接从inputGate读取
next = inputGate.getNextBufferOrEvent();
}
InputGateUtil.createInputGate
public static InputGate createInputGate(InputGate[] inputGates) { if (inputGates.length < 2) {
return inputGates[0];
} else {
return new UnionInputGate(inputGates);
}
}
UnionInputGate
/**
* Input gate wrapper to union the input from multiple input gates.
*
* <p> Each input gate has input channels attached from which it reads data. At each input gate, the
* input channels have unique IDs from 0 (inclusive) to the number of input channels (exclusive).
*
* <pre>
* +---+---+ +---+---+---+
* | 0 | 1 | | 0 | 1 | 2 |
* +--------------+--------------+
* | Input gate 0 | Input gate 1 |
* +--------------+--------------+
* </pre>
*
* The union input gate maps these IDs from 0 to the *total* number of input channels across all
* unioned input gates, e.g. the channels of input gate 0 keep their original indexes and the
* channel indexes of input gate 1 are set off by 2 to 2--4.
*
* <pre>
* +---+---++---+---+---+
* | 0 | 1 || 2 | 3 | 4 |
* +--------------------+
* | Union input gate |
* +--------------------+
* </pre>
*
* It is possible to recursively union union input gates.
*/
public class UnionInputGate implements InputGate { /** The input gates to union. */
private final InputGate[] inputGates; private final Set<InputGate> inputGatesWithRemainingData; //没有结束的inputGate /** Data availability listener across all unioned input gates. */
private final InputGateListener inputGateListener; /** The total number of input channels across all unioned input gates. */
private final int totalNumberOfInputChannels; //所有的inputGates的所有channels的数目 /**
* A mapping from input gate to (logical) channel index offset. Valid channel indexes go from 0
* (inclusive) to the total number of input channels (exclusive).
*/
private final Map<InputGate, Integer> inputGateToIndexOffsetMap; //每个inputGate的index的base,比如上面的gate1的base就是2 /** Flag indicating whether partitions have been requested. */
private boolean requestedPartitionsFlag; public UnionInputGate(InputGate... inputGates) { for (InputGate inputGate : inputGates) {
// The offset to use for buffer or event instances received from this input gate.
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); //当前InputChannels的总数就代表该inputGate的base
inputGatesWithRemainingData.add(inputGate); //加入inputGatesWithRemainingData,表示该inputGate没有结束 currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); //channel数累加
} this.totalNumberOfInputChannels = currentNumberOfInputChannels; this.inputGateListener = new InputGateListener(inputGates, this); //InputGateListener
}
将多个实际的inputGates,合成一个抽象的inputGate;这样做的目的是为了后面处理方便,把多个输入对后面透明化掉
那这样在BarrierBuffer,调用inputGate.getNextBufferOrEvent
其实就是调用,UnionInputGate.getNextBufferOrEvent
@Override
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { if (inputGatesWithRemainingData.isEmpty()) { //如果所有的inputgate都已经结束
return null;
} // Make sure to request the partitions, if they have not been requested before.
requestPartitions(); //从相应的resultpartition去request数据 final InputGate inputGate = inputGateListener.getNextInputGateToReadFrom(); //获取一个有数据的inputGate final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); //真正的取数据,SingleInputGate.getNextBufferOrEvent if (bufferOrEvent.isEvent()
&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
&& inputGate.isFinished()) { //如果是结束event,则表示该inputGate已经结束 if (!inputGatesWithRemainingData.remove(inputGate)) { //从队列内删除
throw new IllegalStateException("Couldn't find input gate in set of remaining " +
"input gates.");
}
} // Set the channel index to identify the input channel (across all unioned input gates)
final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); //取得改inputgate的baseindex bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); //baseindx + 真实的index = union index return bufferOrEvent;
}
InputGateListener
/**
* Data availability listener at all unioned input gates.
*
* <p> The listener registers itself at each input gate and is notified for *each incoming
* buffer* at one of the unioned input gates.
*/
private static class InputGateListener implements EventListener<InputGate> { private final UnionInputGate unionInputGate; private final BlockingQueue<InputGate> inputGatesWithData = new LinkedBlockingQueue<InputGate>(); //Cache所有有available buffer的inputGate @Override
public void onEvent(InputGate inputGate) { //SingleInputGate.onAvailableBuffer时被触发
// This method is called from the input channel thread, which can be either the same
// thread as the consuming task thread or a different one.
inputGatesWithData.add(inputGate); //将inputGate加入队列,等待读取 for (int i = 0; i < registeredListeners.size(); i++) {
registeredListeners.get(i).onEvent(unionInputGate);
}
} InputGate getNextInputGateToReadFrom() throws InterruptedException { //从队列头取一个inputGate
return inputGatesWithData.take();
}
先看下requestPartitions,如何request resultpartition的?
public void requestPartitions() throws IOException, InterruptedException {
if (!requestedPartitionsFlag) {//只需要做一次
for (InputGate inputGate : inputGates) {
inputGate.requestPartitions();
} requestedPartitionsFlag = true;
}
}
SingleInputGate.requestPartitions
public void requestPartitions() throws IOException, InterruptedException {
synchronized (requestLock) {
if (!requestedPartitionsFlag) { //只做一次 for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex); //调用inputChannel.requestSubpartition
}
} requestedPartitionsFlag = true;
}
}
RemoteInputChannel
@Override
void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
if (partitionRequestClient == null) {
// Create a client and request the partition
partitionRequestClient = connectionManager
.createPartitionRequestClient(connectionId); partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
}
}
PartitionRequestClient,先创建,这个负责和resultSubPartition通信
requestSubpartition
public ChannelFuture requestSubpartition(
final ResultPartitionID partitionId,
final int subpartitionIndex,
final RemoteInputChannel inputChannel,
int delayMs) throws IOException { partitionRequestHandler.addInputChannel(inputChannel); //将inputChannel加入partitionRequestHandler final PartitionRequest request = new PartitionRequest( //生成request
partitionId, subpartitionIndex, inputChannel.getInputChannelId()); if (delayMs == 0) {
ChannelFuture f = tcpChannel.writeAndFlush(request); //发送request
f.addListener(listener);
return f;
}
else {
final ChannelFuture[] f = new ChannelFuture[1];
tcpChannel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
f[0] = tcpChannel.writeAndFlush(request);
f[0].addListener(listener);
}
}, delayMs, TimeUnit.MILLISECONDS); return f[0];
}
}
PartitionRequestClientHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) { //普遍msg
decodeMsg(msg);
}
else {
stagedMessages.add(msg);
}
}
catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
decodeMsg
private boolean decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass(); // ---- Buffer --------------------------------------------------------
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); //获取对应的inputChannel return decodeBufferOrEvent(inputChannel, bufferOrEvent);
}
decodeBufferOrEvent
private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
boolean releaseNettyBuffer = true; try {
if (bufferOrEvent.isBuffer()) {
// ---- Buffer ------------------------------------------------
BufferProvider bufferProvider = inputChannel.getBufferProvider(); while (true) {
Buffer buffer = bufferProvider.requestBuffer(); //从channel的bufferProvider中获取buffer if (buffer != null) {
buffer.setSize(bufferOrEvent.getSize());
bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer()); //将数据写入buffer中 inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber); //调用inputChannel.onBuffer return true;
}
else if (bufferListener.waitForBuffer(bufferProvider, bufferOrEvent)) {
releaseNettyBuffer = false; return false;
}
else if (bufferProvider.isDestroyed()) {
return false;
}
}
}
}
RemoteInputChannel
public void onBuffer(Buffer buffer, int sequenceNumber) {
boolean success = false; try {
synchronized (receivedBuffers) {
if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
receivedBuffers.add(buffer); //将buffer放入receivedBuffers
expectedSequenceNumber++; notifyAvailableBuffer();//通知有available buffer success = true;
}
}
}
}
}
notifyAvailableBuffer
protected void notifyAvailableBuffer() {
inputGate.onAvailableBuffer(this);
}
SingleInputGate
public void onAvailableBuffer(InputChannel channel) {
inputChannelsWithData.add(channel); //inputChannelsWithData中表示该channel有数据需要读
EventListener<InputGate> listener = registeredListener;
if (listener != null) {
listener.onEvent(this); //通知UnionInputGate,该inputGate有data需要读
}
}
---------------------------------------------------
SingleInputGate.getNextBufferOrEvent
@Override
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { requestPartitions(); InputChannel currentChannel = null;
while (currentChannel == null) { //如果没有有数据的channel,会循环blocking
currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS); //从inputChannelsWithData poll一个有数据的channel
} final Buffer buffer = currentChannel.getNextBuffer(); //读出buffer if (buffer.isBuffer()) {
return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); if (event.getClass() == EndOfPartitionEvent.class) {
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex()); if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
hasReceivedAllEndOfPartitionEvents = true;
} currentChannel.notifySubpartitionConsumed(); currentChannel.releaseAllResources();
} return new BufferOrEvent(event, currentChannel.getChannelIndex());
}
}
RemoteInputChannel
Buffer getNextBuffer() throws IOException {
synchronized (receivedBuffers) {
Buffer buffer = receivedBuffers.poll(); numBytesIn.inc(buffer.getSize());
return buffer;
}
}