【Flink】Flink 计算资源管理

【Flink】Flink 计算资源管理

1.概述

转载:Flink 源码阅读笔记(6)- 计算资源管理

在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。

2.Task Slot 的基本概念

我们在前面的文章中了解了 Flink 集群的启动流程。在 Flink 集群中,每个 TaskManager 都是一个单独的 JVM 进程(非 MiniCluster 模式),并且在一个 TaskManager 中可能运行多个子任务,这些子任务都在各自独立的线程中运行。为了控制一个 TaskManager 中可以运行的任务的数量,引入了 Task Slot 的概念。

每一个 Task Slot 代表了 TaskManager 所拥有的计算资源的一个固定的子集。例如,一个拥有 3 个 slot 的 TaskManager,每个 slot 可以使用 1⁄3 的内存。这样,运行在不同 slot 中的子任务不会竞争内存资源。目前 Flink 还不支持 CPU 的隔离,只支持内存的隔离。

通过调整 slot 的数量,可以控制子任务的隔离程度。例如,如果每个 TaskManager 只有 1 个 slot,么么就以为者每一组子任务都运行在单独的 JVM 进程中;每个 TaskManager 有多个 slot 的话,就意味着可以有更多的子任务运行在同一个 JVM 中。而在同一个 JVM 进程中的子任务,可以共享TCP连接和心跳消息,减少数据的网络传输,也能共享一些数据结构。一定程度上减少了每个子任务的消耗。

默认情况下, Flink 允许子任务共享 slot ,前提是,它们属于同一个 Job 并且不是同一个 operator 的子任务。这样的结果是,在同一个 slit 中可能会运行 Job 的一个完整的 pipeline。允许 Slot 共享有两个主要的好处:

  1. Flink 计算一个 Job 所需的 slot 数量时,只需要确定其最大并行度即可,而不用考虑每一个任务的并行度;
  2. 能更好的利用资源。如果没有 slot 共享,那些资源需求不大的子任务和资源需求大的子任务会占用相同的资源,但一旦允许 slot 共享,它们就可能被分配到同一个 slot 中。

Flink 通过 SlotSharingGroup 和 CoLocationGroup 来确定在调度任务的时候如何进行资源共享,它们俩分别对应两种约束条件:

  • SlotSharingGroup: 相同 SlotSharingGroup 的不同 JobVertex 的子任务可以被分配在同一个 slot 中,但不保证能做到;

  • CoLocationGroup:相同 SlotSharingGroup 的不同 JobVertex ,它们的第 n 个子任务必须保证都在同一个 slot 中,这是一种强制性的约束。

3.TaskExecutor 中 Slot 的管理

3.1 TaskSlot

首先,我们来看下在 TaskManager,也就是 TaskExecutor 中是如何管理 Slot 的。

SlotID 是一个 slot 的唯一标识,它包含两个属性,其中 ResourceID 表明该 slot 所在的TaskExecutor, slotNumber 是该 slot 在 TaskExecutor 中的索引位置。

TaskSlot 是在 TaskExecutor 中对 slot 的抽象,可能处于 Free, Releasing, Allocated, Active 这四种状态之中。它的主要属性如下:

class TaskSlot {
	/** Index of the task slot. */
	private final int index;

	/** State of this slot. */
	private TaskSlotState state;

	/** Resource characteristics for this slot. */
	private final ResourceProfile resourceProfile;

	/** Tasks running in this slot. */
	//在一个 Slot 中可能执行多个 Task
	private final Map<ExecutionAttemptID, Task> tasks;

	/** Job id to which the slot has been allocated; null if not allocated. */
	private JobID jobId;

	/** Allocation id of this slot; null if not allocated. */
	private AllocationID allocationId;
}

TaskSlot 提供了修改状态的方法,如 allocate(JobID newJobId, AllocationID newAllocationId) 方法会将 slot 标记为 Allocated 状态;markFree() 会将 slot 标记为 Free 状态,但只有在所有 Task 都被移除之后才能释放成功。 slot 在切换状态的时候会先判断它当前所处的状态。另外,可以通过 add(Task task) 向 slot 中添加 Task,需要保证这些 Task 都来自同一个 Job。

3.2 TaskSlotTable

TaskExecutor 主要通过 TaskSlotTable 来管理它所拥有的所有 slot :

class TaskSlotTable implements TimeoutListener<AllocationID> {
/** Timer service used to time out allocated slots. */
	private final TimerService<AllocationID> timerService;

	/** The list of all task slots. */
	private final List<TaskSlot> taskSlots;

	/** Mapping from allocation id to task slot. */
	private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;

	/** Mapping from execution attempt id to task and task slot. */
	private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;

	/** Mapping from job id to allocated slots for a job. */
	private final Map<JobID, Set<AllocationID>> slotsPerJob;

	/** Interface for slot actions, such as freeing them or timing them out. */
	private SlotActions slotActions;
}

通过 allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) 方法可以将指定 index 的 slot 分配给 AllocationID 对应的请求,这个方法会调用 TaskSlot.allocate(JobID newJobId, AllocationID newAllocationId) 方法。这里需要注意的是,allocateSlot 方法的最后一个参数是一个超时时间。我们注意到,TaskSlotTable 有一个成员变量是 TimerService timerService,通过 timeService 可以注册定时器,如果定时器在超时时间到达之前没有被取消,那么 SlotAction.timeout 方法就会被调用。如果被分配的 slot 关联的 slot 在超时之前没有被取消,那么该 slot 就会被重新释放,标记为 Free 状态。

class TaskSlotTable {
	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
		checkInit();
		TaskSlot taskSlot = taskSlots.get(index);
		boolean result = taskSlot.allocate(jobId, allocationId);
		if (result) {
			// update the allocation id to task slot map
			allocationIDTaskSlotMap.put(allocationId, taskSlot);
			// register a timeout for this slot since it's in state allocated
			timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
			// add this slot to the set of job slots
			Set<AllocationID> slots = slotsPerJob.get(jobId);
			if (slots == null) {
				slots = new HashSet<>(4);
				slotsPerJob.put(jobId, slots);
			}
			slots.add(allocationId);
		}
		return result;
	}
}

如果 slot 被标记为 Active,则会取消在分配 slot 的时候关联的定时器:

class TaskSlotTable {
    public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
		checkInit();
		TaskSlot taskSlot = getTaskSlot(allocationId);
		if (taskSlot != null) {
			if (taskSlot.markActive()) {
				// unregister a potential timeout
				LOG.info("Activate slot {}.", allocationId);
				timerService.unregisterTimeout(allocationId);
				return true;
			} else {
				return false;
			}
		} else {
			throw new SlotNotFoundException(allocationId);
		}
	}
}

通过 createSlotReport 可以获得一个 SlotReport 对象, SlotReport 中包含当前 TaskExecutor 中所有 slot 的状态以及它们的分配情况。

4.TaskExecutor

TaskExecutor 需要向 ResourceManager 报告所有 slot 的状态,这样 ResourceManager 就知道了所有 slot 的分配情况。这主要发生在两种情况之下:

  • TaskExecutor 首次和 ResourceManager 建立连接的时候,需要发送 SlotReport
  • TaskExecutor 和 ResourceManager 定期发送心跳信息,心跳包中包含 SlotReport

我们看下相关的代码逻辑:

class TaskExecutor {
	private void establishResourceManagerConnection(
			ResourceManagerGateway resourceManagerGateway,
			ResourceID resourceManagerResourceId,
			InstanceID taskExecutorRegistrationId,
			ClusterInformation clusterInformation) {
		//首次建立连接,向 RM 报告 slot 信息
		final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
			getResourceID(),
			taskExecutorRegistrationId,
			taskSlotTable.createSlotReport(getResourceID()),
			taskManagerConfiguration.getTimeout());

		//.........
	}

	private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, SlotReport> {
		//心跳信息
		@Override
		public CompletableFuture<SlotReport> retrievePayload(ResourceID resourceID) {
			return callAsync(
					() -> taskSlotTable.createSlotReport(getResourceID()),
					taskManagerConfiguration.getTimeout());
		}
	}
}

ResourceManager 通过 TaskExecutor.requestSlot 方法要求 TaskExecutor 分配 slot,由于 ResourceManager 知道所有 slot 的当前状况,因此分配请求会精确到具体的 SlotID :

class TaskExecutor {
	@Override
	public CompletableFuture<Acknowledge> requestSlot(
		final SlotID slotId,
		final JobID jobId,
		final AllocationID allocationId,
		final String targetAddress,
		final ResourceManagerId resourceManagerId,
		final Time timeout) {
		try {
			//判断发送请求的 RM 是否是当前 TaskExecutor 注册的
			if (!isConnectedToResourceManager(resourceManagerId)) {
				final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
				log.debug(message);
				throw new TaskManagerException(message);
			}

			if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
				//如果 slot 是 Free 状态,则分配 slot
				if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
					log.info("Allocated slot for {}.", allocationId);
				} else {
					log.info("Could not allocate slot for {}.", allocationId);
					throw new SlotAllocationException("Could not allocate slot.");
				}
			} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
				//如果 slot 已经被分配了,则抛出异常
				final String message = "The slot " + slotId + " has already been allocated for a different job.";
				log.info(message);
				final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
				throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
			}

			//将分配的 slot 提供给发送请求的 JobManager
			if (jobManagerTable.contains(jobId)) {
				//如果和对应的 JobManager 已经建立了连接,则向 JobManager 提供 slot
				offerSlotsToJobManager(jobId);
			} else {
				//否则,先和JobManager 建立连接,连接建立后会调用 offerSlotsToJobManager(jobId) 方法
				try {
					jobLeaderService.addJob(jobId, targetAddress);
				} catch (Exception e) {
					// free the allocated slot
					try {
						taskSlotTable.freeSlot(allocationId);
					} catch (SlotNotFoundException slotNotFoundException) {
						// slot no longer existent, this should actually never happen, because we've
						// just allocated the slot. So let's fail hard in this case!
						onFatalError(slotNotFoundException);
					}

					// release local state under the allocation id.
					localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
					// sanity check
					if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
						onFatalError(new Exception("Could not free slot " + slotId));
					}
					throw new SlotAllocationException("Could not add job to job leader service.", e);
				}
			}
		} catch (TaskManagerException taskManagerException) {
			return FutureUtils.completedExceptionally(taskManagerException);
		}

		return CompletableFuture.completedFuture(Acknowledge.get());
	}
}

在 Slot 被分配给之后,TaskExecutor 需要将对应的 slot 提供给 JobManager,而这正是通过 offerSlotsToJobManager(jobId) 方法来实现的:

class TaskExecutor {
	private void offerSlotsToJobManager(final JobID jobId) {
		final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
		if (jobManagerConnection == null) {
			log.debug("There is no job manager connection to the leader of job {}.", jobId);
		} else {
			if (taskSlotTable.hasAllocatedSlots(jobId)) {
				log.info("Offer reserved slots to the leader of job {}.", jobId);
				final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
				//获取分配给当前 Job 的 slot,这里只会取得状态为 allocated 的 slot
				final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
				final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
				final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
				while (reservedSlotsIterator.hasNext()) {
					SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
					reservedSlots.add(offer);
				}
				//通过 RPC 调用,将slot提供给 JobMaster
				CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
					getResourceID(),
					reservedSlots,
					taskManagerConfiguration.getTimeout());

				acceptedSlotsFuture.whenCompleteAsync(
					(Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
						if (throwable != null) {
							//超时,则重试
							if (throwable instanceof TimeoutException) {
								log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
								// We ran into a timeout. Try again.
								offerSlotsToJobManager(jobId);
							} else {
								log.warn("Slot offering to JobManager failed. Freeing the slots " +
									"and returning them to the ResourceManager.", throwable);
								// 发生异常,则释放所有的 slot
								for (SlotOffer reservedSlot: reservedSlots) {
									freeSlotInternal(reservedSlot.getAllocationId(), throwable);
								}
							}
						} else {
							//调用成功
							// check if the response is still valid
							if (isJobManagerConnectionValid(jobId, jobMasterId)) {
								// mark accepted slots active
								//对于被 JobMaster 确认接受的 slot, 标记为 Active 状态
								for (SlotOffer acceptedSlot : acceptedSlots) {
									try {
										if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
											// the slot is either free or releasing at the moment
											final String message = "Could not mark slot " + jobId + " active.";
											log.debug(message);
											jobMasterGateway.failSlot(
												getResourceID(),
												acceptedSlot.getAllocationId(),
												new FlinkException(message));
										}
									} catch (SlotNotFoundException e) {
										final String message = "Could not mark slot " + jobId + " active.";
										jobMasterGateway.failSlot(
											getResourceID(),
											acceptedSlot.getAllocationId(),
											new FlinkException(message));
									}

									reservedSlots.remove(acceptedSlot);
								}

								final Exception e = new Exception("The slot was rejected by the JobManager.");
								//释放剩余没有被接受的 slot
								for (SlotOffer rejectedSlot : reservedSlots) {
									freeSlotInternal(rejectedSlot.getAllocationId(), e);
								}
							} else {
								// discard the response since there is a new leader for the job
								log.debug("Discard offer slot response since there is a new leader " +
									"for the job {}.", jobId);
							}
						}
					},
					getMainThreadExecutor());
			} else {
				log.debug("There are no unassigned slots for the job {}.", jobId);
			}
		}
	}
}

通过 freeSlot(AllocationID, Throwable) 方法,可以请求 TaskExecutor 释放和 AllocationID 关联的 slot:

class TaskExecutor {
	@Override
	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
		freeSlotInternal(allocationId, cause);
		return CompletableFuture.completedFuture(Acknowledge.get());
	}

	private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
		checkNotNull(allocationId);
		try {
			final JobID jobId = taskSlotTable.getOwningJob(allocationId);
			//尝试释放 allocationId 绑定的 slot
			final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);

			if (slotIndex != -1) {
				//成功释放 slot
				if (isConnectedToResourceManager()) {
					//告知 ResourceManager 当前 slot 可用
					// the slot was freed. Tell the RM about it
					ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
					resourceManagerGateway.notifySlotAvailable(
						establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
						new SlotID(getResourceID(), slotIndex),
						allocationId);
				}
				if (jobId != null) {
					// 如果和 allocationID 绑定的 Job 已经没有分配的 slot 了,那么可以断开和 JobMaster 的连接了
					// check whether we still have allocated slots for the same job
					if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) {
						// we can remove the job from the job leader service
						try {
							jobLeaderService.removeJob(jobId);
						} catch (Exception e) {
							log.info("Could not remove job {} from JobLeaderService.", jobId, e);
						}

						closeJobManagerConnection(
							jobId,
							new FlinkException("TaskExecutor " + getAddress() +
								" has no more allocated slots for job " + jobId + '.'));
					}
				}
			}
		} catch (SlotNotFoundException e) {
			log.debug("Could not free slot for allocation id {}.", allocationId, e);
		}
		localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
	}
}

5.ResourceManage 中 Slot 的管理

上一节分析 TaskExecutor 中 slot 的管理,仅仅局限于单个 TaskExecutor,而 ResoureManager 则需要对所有 TaskExecutor 的 slot 进行管理。所有的 JobManager 都是通过 ResourceManager 进行资源的申请,ResourceManager 则根据当前的集群的计算资源使用情况将请求“转发”给 TaskExecutor。

5.1 SlotManager

ResourceManager 借助 SlotManager 来管理 slot。 SlotManager 维护了所有已经注册的 TaskExecutor 的所有 slot 的状态,它们的分配情况。SlotManager 还维护了所有处于等待状态的 slot 请求。每当有一个新的 slot 注册或者一个已经分配的 slot 被释放的时候,SlotManager 会试图去满足处于等待状态 slot request。如果可用的 slot 不足以满足要求,SlotManager 会通过 ResourceActions#allocateResource(ResourceProfile) 来告知 ResourceManager, ResourceManager 可能会尝试启动新的 TaskExecutor (如 Yarn 模式下)。

此外,长时间处于空闲状态的 TaskExecutor 或者长时间没有被满足的 pending slot request,会触发超时机制进行处理。

5.2 Slot 注册

首先,来看下 SlotManager 中一些比较重要的成员变量,主要是 slot 的状态和 slot request 的状态:

class SlotManager {
	/** Map for all registered slots. */
	private final HashMap<SlotID, TaskManagerSlot> slots;

	/** Index of all currently free slots. */
	private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;

	/** All currently registered task managers. */
	private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;

	/** Map of fulfilled and active allocations for request deduplication purposes. */
	private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;

	/** Map of pending/unfulfilled slot allocation requests. */
	private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;

	// 资源不足的时候会通过 ResourceActions#allocateResource(ResourceProfile) 申请新的资源(可能启动新的 TaskManager,也可能什么也不做),
	// 这些新申请的资源会被封装为 PendingTaskManagerSlot
	private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;

	/** ResourceManager's id. */
	private ResourceManagerId resourceManagerId;

	/** Callbacks for resource (de-)allocations. */
	private ResourceActions resourceActions;
}

当一个新的 TaskManager 注册的时候,registerTaskManager 被调用:

class SlotManager {
	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
		checkInit();
		LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
		// we identify task managers by their instance id
		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
			reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
		} else {
			// first register the TaskManager
			ArrayList<SlotID> reportedSlots = new ArrayList<>();
			for (SlotStatus slotStatus : initialSlotReport) {
				reportedSlots.add(slotStatus.getSlotID());
			}
			TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
				taskExecutorConnection,
				reportedSlots);
			taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);

			// 依次注册所有的 slot
			// next register the new slots
			for (SlotStatus slotStatus : initialSlotReport) {
				registerSlot(
					slotStatus.getSlotID(),
					slotStatus.getAllocationID(),
					slotStatus.getJobID(),
					slotStatus.getResourceProfile(),
					taskExecutorConnection);
			}
		}
	}

	//注册一个slot
	private void registerSlot(
			SlotID slotId,
			AllocationID allocationId,
			JobID jobId,
			ResourceProfile resourceProfile,
			TaskExecutorConnection taskManagerConnection) {
		if (slots.containsKey(slotId)) {
			// remove the old slot first
			removeSlot(slotId);
		}

		//创建一个 TaskManagerSlot 对象,并加入 slots 中
		final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);

		final PendingTaskManagerSlot pendingTaskManagerSlot;
		if (allocationId == null) {
			//这个 slot 还没有被分配,则找到和当前 slot 的计算资源相匹配的 PendingTaskManagerSlot
			pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
		} else {
			//这个 slot 已经被分配了
			pendingTaskManagerSlot = null;
		}
		if (pendingTaskManagerSlot == null) {
			//两种可能: 1)slot已经被分配了 2)没有匹配的 PendingTaskManagerSlot
			updateSlot(slotId, allocationId, jobId);
		} else {
			// 新注册的 slot 能够满足 PendingTaskManagerSlot 的要求
			pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
			final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
			// PendingTaskManagerSlot 可能有关联的 PedningSlotRequest
			if (assignedPendingSlotRequest == null) {
				//没有关联的 PedningSlotRequest,则将 slot 是 Free 状态
				handleFreeSlot(slot);
			} else {
				//有关联的 PedningSlotRequest,则这个 request 可以被满足,分配 slot
				assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
				allocateSlot(slot, assignedPendingSlotRequest);
			}
		}
	}

	private void handleFreeSlot(TaskManagerSlot freeSlot) {
		Preconditions.checkState(freeSlot.getState() == TaskManagerSlot.State.FREE);
		//先查找是否有能够满足的 PendingSlotRequest
		PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile());
		if (null != pendingSlotRequest) {
			//如果有匹配的 PendingSlotRequest,则分配slot
			allocateSlot(freeSlot, pendingSlotRequest);
		} else {
			freeSlots.put(freeSlot.getSlotId(), freeSlot);
		}
	}
}

除此之外,TaskExecutor 也会定期通过心跳向 ResourceManager 报告 slot 的状态。 在 reportSlotStatus 方法中会更新 slot 的状态。

5.3 请求 Slot

ResourceManager 通过 SlotManager 的 registerSlotRequest(SlotRequest slotRequest) 方法请求 slot,SlotRequest 中封装了请求的 JobId, AllocationID 以及请求的资源描述 ResourceProfile,SlotManager 会将 slot request 进一步封装为 PendingSlotRequest, 意为一个尚未被满足要求的 slot request。

class SlotManager {
	public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
		checkInit();
		if (checkDuplicateRequest(slotRequest.getAllocationId())) {
			LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
			return false;
		} else {
			//将请求封装为 PendingSlotRequest
			PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
			pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
			try {
				//执行请求分配slot的逻辑
				internalRequestSlot(pendingSlotRequest);
			} catch (ResourceManagerException e) {
				// requesting the slot failed --> remove pending slot request
				pendingSlotRequests.remove(slotRequest.getAllocationId());
				throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
			}
			return true;
		}
	}

	private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
		final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
		//首先从 FREE 状态的已注册的 slot 中选择符合要求的 slot
		TaskManagerSlot taskManagerSlot = findMatchingSlot(resourceProfile);

		if (taskManagerSlot != null) {
			//找到了符合条件的slot,分配
			allocateSlot(taskManagerSlot, pendingSlotRequest);
		} else {
			//从 PendingTaskManagerSlot 中选择
			Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);

			//如果连 PendingTaskManagerSlot 中都没有
			if (!pendingTaskManagerSlotOptional.isPresent()) {
				//请求 ResourceManager 分配资源,通过 ResourceActions#allocateResource(ResourceProfile) 回调进行
				pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
			}

			//将 PendingTaskManagerSlot 指派给 PendingSlotRequest
			pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
		}
	}
}

那么具体的 allocatSlot 的逻辑是怎么样的?在上一节我们提到过,通过 RPC 调用可以请求 TaskExecutor 分配 slot,这个 RPC 调用就是在 SlotManager#allocateSlot 中发生的:

class SlotManager {
	private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
		Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE);
		TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
		TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();

		final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
		final AllocationID allocationId = pendingSlotRequest.getAllocationId();
		final SlotID slotId = taskManagerSlot.getSlotId();
		final InstanceID instanceID = taskManagerSlot.getInstanceId();

		//taskManagerSlot 状态变为 PENDING
		taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
		pendingSlotRequest.setRequestFuture(completableFuture);

		//如果有 PendingTaskManager 指派给当前 pendingSlotRequest,要先解除关联
		returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);

		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
		if (taskManagerRegistration == null) {
			throw new IllegalStateException("Could not find a registered task manager for instance id " +
				instanceID + '.');
		}
		taskManagerRegistration.markUsed();

		// RPC call to the task manager
		// 通过 RPC 调用向 TaskExecutor 请求 slot
		CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
			slotId,
			pendingSlotRequest.getJobId(),
			allocationId,
			pendingSlotRequest.getTargetAddress(),
			resourceManagerId,
			taskManagerRequestTimeout);

		//RPC调用的请求完成
		requestFuture.whenComplete(
			(Acknowledge acknowledge, Throwable throwable) -> {
				if (acknowledge != null) {
					completableFuture.complete(acknowledge);
				} else {
					completableFuture.completeExceptionally(throwable);
				}
			});

		//PendingSlotRequest 请求完成的回调函数
		//PendingSlotRequest 请求完成可能是由于上面 RPC 调用完成,也可能是因为 PendingSlotRequest 被取消
		completableFuture.whenCompleteAsync(
			(Acknowledge acknowledge, Throwable throwable) -> {
				try {
					if (acknowledge != null) {
						//如果请求成功,则取消 pendingSlotRequest,并更新 slot 状态 PENDING -> ALLOCATED
						updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
					} else {
						if (throwable instanceof SlotOccupiedException) {
							//这个 slot 已经被占用了,更新状态
							SlotOccupiedException exception = (SlotOccupiedException) throwable;
							updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
						} else {
							//请求失败,将 pendingSlotRequest 从 TaskManagerSlot 中移除
							removeSlotRequestFromSlot(slotId, allocationId);
						}

						if (!(throwable instanceof CancellationException)) {
							//slot request 请求失败,会进行重试
							handleFailedSlotRequest(slotId, allocationId, throwable);
						} else {
							//主动取消
							LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
						}
					}
				} catch (Exception e) {
					LOG.error("Error while completing the slot allocation.", e);
				}
			},
			mainThreadExecutor);
	}
}

5.4 取消 slot 请求

通过 unregisterSlotRequest 可以取消一个 slot request:

class SlotManager {
	public boolean unregisterSlotRequest(AllocationID allocationId) {
		checkInit();
		//从 pendingSlotRequests 中移除
		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
		if (null != pendingSlotRequest) {
			LOG.debug("Cancel slot request {}.", allocationId);
			//取消请求
			cancelPendingSlotRequest(pendingSlotRequest);
			return true;
		} else {
			LOG.debug("No pending slot request with allocation id {} found. Ignoring unregistration request.", allocationId);
			return false;
		}
	}

	private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
		CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
		returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
		if (null != request) {
			request.cancel(false);
		}
	}
}

5.5 超时设置

SlotManager 在启动的时候会启动两个超时检测任务,一个是对 slot request 超时的检测,一个是对 TaskManager 长时间处于空闲状态的检测:

class SlotManager {
		public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
		LOG.info("Starting the SlotManager.");

		this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
		mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
		resourceActions = Preconditions.checkNotNull(newResourceActions);

		started = true;

		//检查 TaskExecutor 是否长时间处于 idle 状态
		taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
			() -> mainThreadExecutor.execute(
				() -> checkTaskManagerTimeouts()),
			0L,
			taskManagerTimeout.toMilliseconds(),
			TimeUnit.MILLISECONDS);

		//检查 slot request 是否超时
		slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
			() -> mainThreadExecutor.execute(
				() -> checkSlotRequestTimeouts()),
			0L,
			slotRequestTimeout.toMilliseconds(),
			TimeUnit.MILLISECONDS);
	}
}

一旦 TaskExecutor 长时间处于空闲状态,则会通过 ResourceActions#releaseResource() 回调函数释放资源:

class SlotManager {
	private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) {
		final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
		LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
		resourceActions.releaseResource(timedOutTaskManagerId, cause);
	}
}

如果一个 slot request 超时,则会取消 PendingSlotRequest,并通过 ResourceActions#notifyAllocationFailure() 告知 ResourceManager:

class SlotManager {
	private void checkSlotRequestTimeouts() {
		if (!pendingSlotRequests.isEmpty()) {
			long currentTime = System.currentTimeMillis();
			Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
			while (slotRequestIterator.hasNext()) {
				PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
				if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
					slotRequestIterator.remove();

					if (slotRequest.isAssigned()) {
						cancelPendingSlotRequest(slotRequest);
					}

					resourceActions.notifyAllocationFailure(
						slotRequest.getJobId(),
						slotRequest.getAllocationId(),
						new TimeoutException("The allocation could not be fulfilled in time."));
				}
			}
		}
	}
}

6.ResourceManager

我们已经知道了, ResoureManager 实际上是通过 SlotManager 来管理 TaskExecutor 所注册的所有 slot,但 ResourceManagee 自身需要对外提供 RPC 调用方法,从而将 slot 管理相关的方法暴露给 JobMaster 和 TaskExecutor。

6.1 RPC 接口

首先,我们来看下 ResourceManager 提供了哪些 slot 管理相关的 RPC 方法:

interface ResouceManagerGateway {
	CompletableFuture<Acknowledge> requestSlot(
		JobMasterId jobMasterId,
		SlotRequest slotRequest,
		@RpcTimeout Time timeout);

	void cancelSlotRequest(AllocationID allocationID);

	CompletableFuture<Acknowledge> sendSlotReport(
		ResourceID taskManagerResourceId,
		InstanceID taskManagerRegistrationId,
		SlotReport slotReport,
		@RpcTimeout Time timeout);

	void notifySlotAvailable(
		InstanceID instanceId,
		SlotID slotID,
		AllocationID oldAllocationId);
}

上述的四个方法,requestSlot 和 cancelSlotRequest 主要供 JobMaster 进行调用,而 sendSlotReport 和 notifySlotAvailable 则主要供 TaskExecutor 调用。ResourceManager 在接收到上述 RPC 调用后,会通过 SlotManager 完成具体的工作。

6.2 动态资源管理

FLIP-6 引入的一个重要特性是,ResourceManager 支持动态管理 TaskExecutor 计算资源,从而可以更好地和 Yarn、 Mesos、Kubernetes 等框架进行集成,动态管理计算资源。下面我们来介绍下这个特性是怎么实现的。

前面在介绍 SlotManager 的时候提到:如果当前注册的 slot 不能满足 slot request 的要求,那么 SlotManager 会通过 ResourceActions#allocateResource 回调告知 ResourceManager;当一个 SlotManager 检查到一个 TaskExecutor 长时间处于 Idle 状态时,也会通过 ResourceActions#releaseResource 回调告知 ResourceManager。通过这两个回调, ResourceManager 就可以动态申请资源及释放资源:

class ResourceManager {
	private class ResourceActionsImpl implements ResourceActions {
		@Override
		public void releaseResource(InstanceID instanceId, Exception cause) {
			validateRunsInMainThread();
			//释放资源
			ResourceManager.this.releaseResource(instanceId, cause);
		}

		@Override
		public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) {
			validateRunsInMainThread();
			//申请新的资源,具体行为和不同的 ResourceManager 的实现有关。其返回的列表相当于是承诺即将分配的资源
			return startNewWorker(resourceProfile);
		}

		@Override
		public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
			validateRunsInMainThread();
			JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
			if (jobManagerRegistration != null) {
				jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
			}
		}
	}

	protected void releaseResource(InstanceID instanceId, Exception cause) {
		WorkerType worker = null;

		// TODO: Improve performance by having an index on the instanceId
		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) {
			if (entry.getValue().getInstanceID().equals(instanceId)) {
				worker = entry.getValue().getWorker();
				break;
			}
		}

		if (worker != null) {
			//停止对应的worker,具体行为和不同的 ResourceManager 的实现有关
			if (stopWorker(worker)) {
				closeTaskManagerConnection(worker.getResourceID(), cause);
			} else {
				log.debug("Worker {} could not be stopped.", worker.getResourceID());
			}
		} else {
			// unregister in order to clean up potential left over state
			slotManager.unregisterTaskManager(instanceId);
		}
	}

	public abstract Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile);
	public abstract boolean stopWorker(WorkerType worker);
}

startNewWorker 和 stopWorker 这两个抽象方法是实现动态申请和释放资源的关键。对于 Standalone 模式而言,TaskExecutor 是固定的,不支持动态启动和释放;而对于在 Yarn 上运行的 Flink, YarnResourceManager 中这两个方法的具体实现就涉及到启动新的 container 和释放已经申请的 container。

class YarnResourceManager {
	@Override
	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
		Preconditions.checkArgument(
			ResourceProfile.UNKNOWN.equals(resourceProfile),
			"The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.");
		//申请 container
		requestYarnContainer();

		return slotsPerWorker;
	}

	@Override
	public boolean stopWorker(final YarnWorkerNode workerNode) {
		final Container container = workerNode.getContainer();
		log.info("Stopping container {}.", container.getId());
		try {
			nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
		} catch (final Exception e) {
			log.warn("Error while calling YARN Node Manager to stop container", e);
		}
		//释放container
		resourceManagerClient.releaseAssignedContainer(container.getId());
		workerNodeMap.remove(workerNode.getResourceID());
		return true;
	}
}

7. JobManager 中 Slot 的管理

相比于 TaskExecutor 和 ResourceManager, JobManager 中资源管理的部分可能要相对更为复杂一下,这主要是由于 Flink 允许通过 SlotSharingGroup 和 CoLocationGroup 约束使得多个子任务在相同的的 slot 中运行。在 JobMaster 中,主要通过 SlotPool 和 ResourceManager 及 TaskExecutor 进行通信,并管理分配给当前 JobMaster 的 slot;而具体到当前 Job 的所有子任务的调度和资源分配,则主要依赖 Scheduler 和 SlotSharingManager 。

7.1 PhysicalSlot vs. LogicalSlot vs. MultiTaskSlot

首先要区分一下 PhysicalSlot 和 LogicalSlot 这两个概念:PhysicalSlot 表征的是物理意义上 TaskExecutor 上的一个 slot,而 LogicalSlot 表征逻辑上的一个 slot,一个 task 可以部署到一个 LogicalSlot 上,但它和物理上一个具体的 slot 并不是一一对应的。由于资源共享等机制的存在,多个 LogicalSlot 可能被映射到同一个 PhysicalSlot 上。

PhysicalSlot 接口唯一的实现类是 AllocatedSlot:

public interface PhysicalSlot extends SlotContext {
	boolean tryAssignPayload(Payload payload);

	/**
	 * Payload which can be assigned to an {@link AllocatedSlot}.
	 */
	interface Payload {
		/**
		 * Releases the payload
		 *
		 * @param cause of the payload release
		 */
		void release(Throwable cause);
	}
}

class AllocatedSlot implements PhysicalSlot {
	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
	private final AllocationID allocationId;

	/** The location information of the TaskManager to which this slot belongs */
	private final TaskManagerLocation taskManagerLocation;

	/** The resource profile of the slot provides */
	private final ResourceProfile resourceProfile;

	/** RPC gateway to call the TaskManager that holds this slot */
	private final TaskManagerGateway taskManagerGateway;

	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
	private final int physicalSlotNumber;

	private final AtomicReference<Payload> payloadReference;
}

LogicSlot 接口和它的实现类 SingleLogicalSlot:

public interface LogicalSlot {
	TaskManagerLocation getTaskManagerLocation();
	TaskManagerGateway getTaskManagerGateway();
	int getPhysicalSlotNumber();

	AllocationID getAllocationId();
	SlotRequestId getSlotRequestId();

	Locality getLocality();

	CompletableFuture<?> releaseSlot(@Nullable Throwable cause);

	@Nullable
	SlotSharingGroupId getSlotSharingGroupId();

	boolean tryAssignPayload(Payload payload);
	Payload getPayload();

	/**
	 * Payload for a logical slot.
	 */
	interface Payload {
		void fail(Throwable cause);
		CompletableFuture<?> getTerminalStateFuture();
	}
}

public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
	private final SlotRequestId slotRequestId;

	private final SlotContext slotContext;

	// null if the logical slot does not belong to a slot sharing group, otherwise non-null
	@Nullable
	private final SlotSharingGroupId slotSharingGroupId;

	// locality of this slot wrt the requested preferred locations
	private final Locality locality;

	// owner of this slot to which it is returned upon release
	private final SlotOwner slotOwner;

	private final CompletableFuture<Void> releaseFuture;

	private volatile State state;

	// LogicalSlot.Payload of this slot
	private volatile Payload payload;
}

注意,SingleLogicalSlot 实现了 PhysicalSlot.Payload 接口,就是说说 SingleLogicalSlot 可以作为 payload 被分配给 PhysicalSlot。类似地, LogicalSlot 同样规定了其所能承载的 payload , LogicalSlot.Payload 接口的实现类是 Execution,也就是需要被调度执行的一个 task。

同样需要关注一下 AllocationID 和 SlotRequestID 的区别:AllocationID 是用来区分物理内存的分配,它总是和 AllocatedSlot 向关联的;而 SlotRequestID 是任务调度执行的时候请求 LogicalSlot,是和 LogicalSlot 关联的。

我们已经知道,为了实现 slot 资源的共享,我们需要把多个 LogicalSlot 映射到同一个 PhysicalSlot 上,那么这个映射是如何实现的呢?这里就需要引入 PhysicalSlot.Payload 接口的另一个实现:SlotSharingManager 的内部类 SlotSharingManager.MultiTaskSlot。

MultiTaskSlot 和 SingleTaskSlot 的公共父类是 TaskSlot,通过构造一个由 TaskSlot 构成的树形结构来实现 slot 共享和 CoLocationGroup 的强制约束。MultiTaskSlot 对应树形结构的内部节点,它可以包含多个子节点(可以是MultiTaskSlot,也可以是 SingleTaskSlot);而 SingleTaskSlot 对应树形结构的叶子结点。

树的根节点是 MultiTaskSlot,根节点会被分配一个 SlotContext,SlotContext 代表了其所分配的 TaskExecutor 中的一个物理 slot,这棵树中所有的任务都会在同一个 slot 中运行。一个 MultiTaskSlot 可以包含多个叶子节点,只要用来区分这些叶子节点 TaskSlot 的 AbstractID 不同即可(可能是 JobVertexID,也可能是 CoLocationGroup 的 ID)。

先来看一下 TaskSlot 封装的两个 ID,其一是 SlotRequestId,其二是 AbstractID:

class TaskSlot {
		// every TaskSlot has an associated slot request id
		private final SlotRequestId slotRequestId;

		// all task slots except for the root slots have a group id assigned
		// 除了 root 节点,每个节点都有一个 groupId 用来区分一个 TaskSlot。可能是 JobVertexID,也可能是 CoLocationGroup 的 ID
		@Nullable
		private final AbstractID groupId;

		public boolean contains(AbstractID groupId) {
			return Objects.equals(this.groupId, groupId);
		}

		public abstract void release(Throwable cause);
}

MultiTaskSlot 继承了 TaskSlot,MultiTaskSlot 可以有多个子节点。MultiTaskSlot 可以作为根节点,也可以作为内部节点。MultiTaskSlot 也实现了 PhysicalSlot.Payload 接口,可以分配给 PhysicalSlot(在作为根节点的情况下):

public final class MultiTaskSlot extends TaskSlot implements PhysicalSlot.Payload {
		private final Map<AbstractID, TaskSlot> children;

		// the root node has its parent set to null
		@Nullable
		private final MultiTaskSlot parent;

		// underlying allocated slot
		private final CompletableFuture<? extends SlotContext> slotContextFuture;

		// slot request id of the allocated slot
		@Nullable
		private final SlotRequestId allocatedSlotRequestId;

}

SingleTaskSlot 只能作为叶子节点,它拥有一个 LogicalSlot,后续可以用来分配具体的 task:

public final class SingleTaskSlot extends TaskSlot {
		private final MultiTaskSlot parent;
// future containing a LogicalSlot which is completed once the underlying SlotContext future is completed
		private final CompletableFuture<SingleLogicalSlot> singleLogicalSlotFuture;

		private SingleTaskSlot(
				SlotRequestId slotRequestId,
				AbstractID groupId,
				MultiTaskSlot parent,
				Locality locality) {
			super(slotRequestId, groupId);

			this.parent = Preconditions.checkNotNull(parent);

			Preconditions.checkNotNull(locality);
			singleLogicalSlotFuture = parent.getSlotContextFuture()
				.thenApply(
					(SlotContext slotContext) -> {
						//在父节点被分配了 PhysicalSlot 后,创建 SingleLogicalSlot
						LOG.trace("Fulfill single task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
						return new SingleLogicalSlot(
							slotRequestId,
							slotContext,
							slotSharingGroupId,
							locality,
							slotOwner);
					});
		}
}

更具体一点地说,对于普通的 SlotShargingGroup 的约束,形成的树形结构是: MultiTaskSlot 作为根节点,多个 SingleTaskSlot 作为叶子节点,这些叶子节点分别代表不同的任务,用来区分它们的 JobVertextID 不同。对于 CoLocationGroup 强制约束,会在 MultiTaskSlot 根节点的下一级创建一个 MultiTaskSlot 节点(用 CoLocationGroup ID) 来区分,同一个 CoLocationGroup 约束下的子任务进一步作为第二层 MultiTaskSlot 的叶子节点。

7.2 SlotPool

JobManager 使用 SlotPool 来向 ResourceManager 申请 slot, 并管理所有分配给该 JobManager 的 slots。这一节所说的 slot 指的都是 physical slot。

SlotPool 接口的唯一实现类是 SlotPoolImpl,先来看一下几个关键的成员变量:

class SlotPoolImpl implements SlotPool {
	/** The book-keeping of all allocated slots. */
	//所有分配给当前 JobManager 的 slots
	private final AllocatedSlots allocatedSlots;

	/** The book-keeping of all available slots. */
	//所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload)
	private final AvailableSlots availableSlots;

	/** All pending requests waiting for slots. */
	//所有处于等待状态的slot request(已经发送请求给 ResourceManager)
	private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;

	/** The requests that are waiting for the resource manager to be connected. */
	//处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)
	private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
}

每一个分配给 SlotPool 的 slot 都通过 AllocationID 进行唯一区分。 getAvailableSlotsInformation 方法可以获取当前可用的 slots(还没有 payload),而后可以通过 allocateAvailableSlot 将特定 AllocationID 关联的 AlocatedSlot 分配给指定的 SlotRequestID 对应的请求:

class SlotPoolImpl implements SlotPool {
	//列出当前可用的 slot
	@Override
	public Collection<SlotInfo> getAvailableSlotsInformation() {
		return availableSlots.listSlotInfo();
	}

	//将 allocationID 关联的 slot 分配给 slotRequestId 对应的请求
	@Override
	public Optional<PhysicalSlot> allocateAvailableSlot(
		@Nonnull SlotRequestId slotRequestId,
		@Nonnull AllocationID allocationID) {

		componentMainThreadExecutor.assertRunningInMainThread();

		//从 availableSlots 中移除
		AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);
		if (allocatedSlot != null) {
			//加入已分配的映射关系中
			allocatedSlots.add(slotRequestId, allocatedSlot);
			return Optional.of(allocatedSlot);
		} else {
			return Optional.empty();
		}
	}
}

如果当前没有可用的 slot,则可以要求 SlotPool 向 ResourceManager 进行申请:

class SlotPoolImpl implements SlotPool {

	//向RM申请新的 slot
	@Override
	public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
		@Nonnull SlotRequestId slotRequestId,
		@Nonnull ResourceProfile resourceProfile,
		Time timeout) {

		return requestNewAllocatedSlotInternal(slotRequestId, resourceProfile, timeout)
			.thenApply((Function.identity()));
	}

	private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(
		@Nonnull SlotRequestId slotRequestId,
		@Nonnull ResourceProfile resourceProfile,
		@Nonnull Time timeout) {

		componentMainThreadExecutor.assertRunningInMainThread();

		//构造一个 PendingRequest
		final PendingRequest pendingRequest = new PendingRequest(
			slotRequestId,
			resourceProfile);

		// register request timeout
		FutureUtils
			.orTimeout(
				pendingRequest.getAllocatedSlotFuture(),
				timeout.toMilliseconds(),
				TimeUnit.MILLISECONDS,
				componentMainThreadExecutor)
			.whenComplete(
				(AllocatedSlot ignored, Throwable throwable) -> {
					if (throwable instanceof TimeoutException) {
						//超时处理
						timeoutPendingSlotRequest(slotRequestId);
					}
				});

		if (resourceManagerGateway == null) {
			//如果当前没有和 RM 建立连接,则需要等待 RM 建立连接
			stashRequestWaitingForResourceManager(pendingRequest);
		} else {
			//当前已经和 RM 建立了连接,向 RM 申请slot
			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
		}
		return pendingRequest.getAllocatedSlotFuture();
	}

	//如果当前没有和 RM 建立连接,则需要等待 RM 建立连接,加入 waitingForResourceManager
	//一旦和 RM 建立连接,就会向 RM 发送请求
	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
		log.info("Cannot serve slot request, no ResourceManager connected. " +
				"Adding as pending request [{}]",  pendingRequest.getSlotRequestId());
		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
	}

	//当前已经和 RM 建立了连接,向 RM 申请slot
	private void requestSlotFromResourceManager(
			final ResourceManagerGateway resourceManagerGateway,
			final PendingRequest pendingRequest) {
		//生成一个 AllocationID,后面分配的 slot 通过 AllocationID 区分
		final AllocationID allocationId = new AllocationID();

		//添加到等待处理的请求中
		pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);

		pendingRequest.getAllocatedSlotFuture().whenComplete(
			(AllocatedSlot allocatedSlot, Throwable throwable) -> {
				if (throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) {
					// cancel the slot request if there is a failure or if the pending request has
					// been completed with another allocated slot
					resourceManagerGateway.cancelSlotRequest(allocationId);
				}
			});

		//通过 RPC 调用向 RM 请求 slot,RM 的处理流程在前面已经介绍过
		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
			jobMasterId,
			new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
			rpcTimeout);

		FutureUtils.whenCompleteAsyncIfNotDone(
			rmResponse,
			componentMainThreadExecutor,
			(Acknowledge ignored, Throwable failure) -> {
				// on failure, fail the request future
				if (failure != null) {
					slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
				}
			});
	}
}

一旦 ResourceManager 完成了 slot 分配的处理流程,会将分配的 slot 提供给 JobManager,最终 SlotPool.offerSlots() 方法会被调用:

class SlotPoolImpl {
	//向 SlotPool 分配 slot,返回已经被接受的 slot 集合。没有被接受的 slot,RM 可以分配给其他 Job。
	@Override
	public Collection<SlotOffer> offerSlots(
			TaskManagerLocation taskManagerLocation,
			TaskManagerGateway taskManagerGateway,
			Collection<SlotOffer> offers) {
		ArrayList<SlotOffer> result = new ArrayList<>(offers.size());
		//SlotPool 可以确定是否接受每一个 slot
		for (SlotOffer offer : offers) {
			if (offerSlot(
				taskManagerLocation,
				taskManagerGateway,
				offer)) {

				result.add(offer);
			}
		}
		return result;
	}

	boolean offerSlot(
			final TaskManagerLocation taskManagerLocation,
			final TaskManagerGateway taskManagerGateway,
			final SlotOffer slotOffer) {

		componentMainThreadExecutor.assertRunningInMainThread();

		// check if this TaskManager is valid
		final ResourceID resourceID = taskManagerLocation.getResourceID();
		final AllocationID allocationID = slotOffer.getAllocationId();

		if (!registeredTaskManagers.contains(resourceID)) {
			log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
					slotOffer.getAllocationId(), taskManagerLocation);
			return false;
		}

		// check whether we have already using this slot
		// 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现
		AllocatedSlot existingSlot;
		if ((existingSlot = allocatedSlots.get(allocationID)) != null ||
			(existingSlot = availableSlots.get(allocationID)) != null) {

			// we need to figure out if this is a repeated offer for the exact same slot,
			// or another offer that comes from a different TaskManager after the ResourceManager
			// re-tried the request

			// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of
			// the actual slots on the TaskManagers
			// Note: The slotOffer should have the SlotID
			final SlotID existingSlotId = existingSlot.getSlotId();
			final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());

			if (existingSlotId.equals(newSlotId)) {
				//这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offer
				log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);

				// return true here so that the sender will get a positive acknowledgement to the retry
				// and mark the offering as a success
				return true;
			} else {
				//已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot
				// the allocation has been fulfilled by another slot, reject the offer so the task executor
				// will offer the slot to the resource manager
				return false;
			}
		}

		//这个 slot 关联的 AllocationID 此前没有出现过

		//新建一个 AllocatedSlot 对象,表示新分配的 slot
		final AllocatedSlot allocatedSlot = new AllocatedSlot(
			allocationID,
			taskManagerLocation,
			slotOffer.getSlotIndex(),
			slotOffer.getResourceProfile(),
			taskManagerGateway);

		// check whether we have request waiting for this slot
		// 检查是否有一个 request 和 这个 AllocationID 关联
		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
		if (pendingRequest != null) {
			// we were waiting for this!
			//有一个pending request 正在等待这个 slot
			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);

			//尝试去完成那个等待的请求
			if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
				// we could not complete the pending slot future --> try to fulfill another pending request
				//失败了
				allocatedSlots.remove(pendingRequest.getSlotRequestId());
				//尝试去满足其他在等待的请求
				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
			} else {
				log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
			}
		}
		else {
			//没有请求在等待这个slot,可能请求已经被满足了
			// we were actually not waiting for this:
			//   - could be that this request had been fulfilled
			//   - we are receiving the slots from TaskManagers after becoming leaders
			//尝试去满足其他在等待的请求
			tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
		}

		// we accepted the request in any case. slot will be released after it idled for
		// too long and timed out
		return true;
	}
}

一旦有新的可用的 AllocatedSlot 的时候,SlotPoolImpl 会尝试用这个 AllocatedSlot 去提前满足其他还在等待响应的请求:

class SlotManagerImpl implements SlotPool {
	private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
		Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");

		//查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求
		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);

		if (pendingRequest != null) {
			//如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求
			log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());

			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
		} else {
			//如果没有,那么这个 AllocatedSlot 变成 available 的
			log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
		}
	}

	private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
		final ResourceProfile slotResources = slot.getResourceProfile();

		// try the requests sent to the resource manager first
		for (PendingRequest request : pendingRequests.values()) {
			if (slotResources.isMatching(request.getResourceProfile())) {
				pendingRequests.removeKeyA(request.getSlotRequestId());
				return request;
			}
		}

		// try the requests waiting for a resource manager connection next
		for (PendingRequest request : waitingForResourceManager.values()) {
			if (slotResources.isMatching(request.getResourceProfile())) {
				waitingForResourceManager.remove(request.getSlotRequestId());
				return request;
			}
		}
		// no request pending, or no request matches
		return null;
	}
}

slotPool 启动的时候会开启一个定时调度的任务,周期性地检查空闲的 slot,如果 slot 空闲时间过长,会将该 slot 归还给 TaskManager:

class SlotPoolImpl {
	private void checkIdleSlot() {
		// The timestamp in SlotAndTimestamp is relative
		final long currentRelativeTimeMillis = clock.relativeTimeMillis();

		final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());

		for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
			if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
				expiredSlots.add(slotAndTimestamp.slot);
			}
		}

		final FlinkException cause = new FlinkException("Releasing idle slot.");

		for (AllocatedSlot expiredSlot : expiredSlots) {
			final AllocationID allocationID = expiredSlot.getAllocationId();
			if (availableSlots.tryRemove(allocationID) != null) {

				//将空闲的 slot 归还给 TaskManager
				log.info("Releasing idle slot [{}].", allocationID);
				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
					allocationID,
					cause,
					rpcTimeout);

				FutureUtils.whenCompleteAsyncIfNotDone(
					freeSlotFuture,
					componentMainThreadExecutor,
					(Acknowledge ignored, Throwable throwable) -> {
						if (throwable != null) {
							if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
								log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
										"Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
									throwable);
								tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
							} else {
								log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
									"longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
							}
						}
					});
			}
		}

		scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
	}
}

8.Scheduler 和 SlotSharingManager

我们已经了解,SlotPool 主要负责的是分配给当前 JobMaster 的 PhysicalSlot 的管理。但是,具体到每一个 Task 所需要的计算资源的调度和管理,是按照 LogicalSlot 进行组织的,不同的 Task 所分配的 LogicalSlot 各不相同,但它们底层的 PhysicalSlot 可能是同一个。主要的逻辑都封装在 SlotSharingManager 和 Scheduler 中。

前面已经提到过,通过构造一个由 TaskSlot 构成的树形结构可以实现 SlotSharingGroup 内的资源共享以及 CoLocationGroup 的强制约束,这主要就是通过 SlotSharingManager 来完成的。每一个 SlotSharingGroup 都会有一个与其对应的 SlotSharingManager。

SlotSharingManager 主要的成员变量如下,除了关联的 SlotSharingGroupId 以外,最重要的就是用于管理 TaskSlot 的三个 Map :

class SlotSharingManager {
	private final SlotSharingGroupId slotSharingGroupId;

	/** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */
	private final AllocatedSlotActions allocatedSlotActions;

	/** Owner of the slots to which to return them when they are released from the outside. */
	private final SlotOwner slotOwner;

	//所有的 TaskSlot,包括 root 和 inner 和 leaf
	private final Map<SlotRequestId, TaskSlot> allTaskSlots;

	//root MultiTaskSlot,但底层的 Physical Slot 还没有分配好
	/** Root nodes which have not been completed because the allocated slot is still pending. */
	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;

	//root MultiTaskSlot,底层的 physical slot 也已经分配好了,按照两层 map 的方式组织,
	//可以通过已分配的 Physical slot 所在的TaskManager 的位置进行查找
	/** Root nodes which have been completed (the underlying allocated slot has been assigned). */
	private final Map<TaskManagerLocation, Map<AllocationID, MultiTaskSlot>> resolvedRootSlots;
}

当需要构造一个新的 TaskSlot 树的时候,需要调用 createRootSlot 来创建根节点:

class SlotSharingManager {
	MultiTaskSlot createRootSlot(
			SlotRequestId slotRequestId,
			CompletableFuture<? extends SlotContext> slotContextFuture,
			SlotRequestId allocatedSlotRequestId) {
		final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
			slotRequestId,
			slotContextFuture,
			allocatedSlotRequestId);

		LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);

		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);

		//先加入到 unresolvedRootSlots 中
		unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);

		// add the root node to the set of resolved root nodes once the SlotContext future has
		// been completed and we know the slot's TaskManagerLocation
		slotContextFuture.whenComplete(
			(SlotContext slotContext, Throwable throwable) -> {
				if (slotContext != null) {
					//一旦physical slot完成分配,就从 unresolvedRootSlots 中移除,加入到 resolvedRootSlots 中
					final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);

					if (resolvedRootNode != null) {
						final AllocationID allocationId = slotContext.getAllocationId();
						LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, allocationId);

						final Map<AllocationID, MultiTaskSlot> innerMap = resolvedRootSlots.computeIfAbsent(
							slotContext.getTaskManagerLocation(),
							taskManagerLocation -> new HashMap<>(4));

						MultiTaskSlot previousValue = innerMap.put(allocationId, resolvedRootNode);
						Preconditions.checkState(previousValue == null);
					}
				} else {
					rootMultiTaskSlot.release(throwable);
				}
			});
		return rootMultiTaskSlot;
	}
}

另外,Flink 中不同 Task 只要在同一个 SlotSharingGroup 中就可以进行资源共享,但有一个隐含的条件是,这两个 Task 需要是不同的 Operator 的子任务。 例如,如果 map 算子的并行度为三,map[1] 子任务和 map[2] 子任务是不能落在同一个 PhysicalSlot 中的。在 listResolvedRootSlotInfo 和 getUnresolvedRootSlot 中,都有 !multiTaskSlot.contains(groupId) 的逻辑,也就是说要确保一棵 TaskSlot 构成的树中不会出现同一个算子的不同子任务。

class SlotSharingManager {
	@Nonnull
	public Collection<SlotInfo> listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
		//列出已经分配了physical slot 的root MultiTaskSlot,但要求 MultiTaskSlot 不包含指定的 groupId
		return resolvedRootSlots
			.values()
			.stream()
			.flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
			.filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
			.map((MultiTaskSlot multiTaskSlot) -> (SlotInfo) multiTaskSlot.getSlotContextFuture().join())
			.collect(Collectors.toList());
	}

	@Nullable
	public MultiTaskSlot getResolvedRootSlot(@Nonnull SlotInfo slotInfo) {
		//根据 SlotInfo(TasManagerLocation 和 AllocationId)找到 MultiTaskSlot
		Map<AllocationID, MultiTaskSlot> forLocationEntry = resolvedRootSlots.get(slotInfo.getTaskManagerLocation());
		return forLocationEntry != null ? forLocationEntry.get(slotInfo.getAllocationId()) : null;
	}

	/**
	 * Gets an unresolved slot which does not yet contain the given groupId. An unresolved
	 * slot is a slot whose underlying allocated slot has not been allocated yet.
	 *
	 * @param groupId which the returned slot must not contain
	 * @return the unresolved slot or null if there was no root slot with free capacities
	 */
	@Nullable
	MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
		//找到一个不包含指定 groupId 的 root MultiTaskSlot
		for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
			if (!multiTaskSlot.contains(groupId)) {
				return multiTaskSlot;
			}
		}
		return null;
	}
}

任务调度时 LogicalSlot 资源的申请通过 Scheduler 接口进行管理,Scheduler 接口继承了 SlotProvider 接口, 它的唯一实现类是 SchuedulerImpl。

public interface SlotProvider {
	//申请 slot,返回值一个 LogicalSlot 的 future
	CompletableFuture<LogicalSlot> allocateSlot(
		SlotRequestId slotRequestId,
		ScheduledUnit scheduledUnit,
		SlotProfile slotProfile,
		boolean allowQueuedScheduling,
		Time allocationTimeout);

	void cancelSlotRequest(
		SlotRequestId slotRequestId,
		@Nullable SlotSharingGroupId slotSharingGroupId,
		Throwable cause);
}

public interface Scheduler extends SlotProvider, SlotOwner {
	void start(@Nonnull ComponentMainThreadExecutor mainThreadExecutor);
	boolean requiresPreviousExecutionGraphAllocations();
}

SchedulerImpl 的主要成员变量有:

class SchedulerImpl implements Scheduler {
	private final SlotSelectionStrategy slotSelectionStrategy;

	private final SlotPool slotPool;

	private final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
}

很明显,SchedulerImpl 借助 SlotPool 来申请 PhysicalSlot,借助 SlotSharingManager 实现 slot 共享。SlotSelectionStrategy 接口主要用于从一组 slot 中选出最符合资源申请偏好的一个。

class SchedulerImpl {
	public CompletableFuture<LogicalSlot> allocateSlot(
		SlotRequestId slotRequestId,
		ScheduledUnit scheduledUnit,
		SlotProfile slotProfile,
		boolean allowQueuedScheduling,
		Time allocationTimeout) {
		log.debug("Received slot request [{}] for task: {}", slotRequestId, scheduledUnit.getTaskToExecute());

		componentMainThreadExecutor.assertRunningInMainThread();

		final CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<>();

		//如果没有指定 SlotSharingGroupId,说明这个任务不运行 slot 共享,要独占一个 slot
		CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
			allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) :
			allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);

		allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
			if (failure != null) {
				cancelSlotRequest(
					slotRequestId,
					scheduledUnit.getSlotSharingGroupId(),
					failure);
				allocationResultFuture.completeExceptionally(failure);
			} else {
				allocationResultFuture.complete(slot);
			}
		});
		return allocationResultFuture;
	}

	@Override
	public void cancelSlotRequest(
		SlotRequestId slotRequestId,
		@Nullable SlotSharingGroupId slotSharingGroupId,
		Throwable cause) {

		componentMainThreadExecutor.assertRunningInMainThread();

		if (slotSharingGroupId != null) {
			releaseSharedSlot(slotRequestId, slotSharingGroupId, cause);
		} else {
			slotPool.releaseSlot(slotRequestId, cause);
		}
	}

	@Override
	public void returnLogicalSlot(LogicalSlot logicalSlot) {
		SlotRequestId slotRequestId = logicalSlot.getSlotRequestId();
		SlotSharingGroupId slotSharingGroupId = logicalSlot.getSlotSharingGroupId();
		FlinkException cause = new FlinkException("Slot is being returned to the SlotPool.");
		cancelSlotRequest(slotRequestId, slotSharingGroupId, cause);
	}
}

这几个对外暴露的方法的逻辑都比较清晰,接着来看下内部的具体实现。

如果不允许资源共享,那么直接从 SlotPool 中获取 PhysicalSlot,然后创建一个 LogicalSlot 即可:

class SchedulerImpl {
	private CompletableFuture<LogicalSlot> allocateSingleSlot(
		SlotRequestId slotRequestId,
		SlotProfile slotProfile,
		boolean allowQueuedScheduling,
		Time allocationTimeout) {

		//先尝试从 SlotPool 可用的 AllocatedSlot 中获取
		Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile);

		if (slotAndLocality.isPresent()) {
			//如果有已经有可用的了,就创建一个 SingleLogicalSlot,并作为 AllocatedSlot 的payload
			// already successful from available
			try {
				return CompletableFuture.completedFuture(
					completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));
			} catch (FlinkException e) {
				return FutureUtils.completedExceptionally(e);
			}
		} else if (allowQueuedScheduling) {
			//暂时没有可用的,如果允许排队的话,可以要求 SlotPool 向 RM 申请一个新的 slot
			// we allocate by requesting a new slot
			return slotPool
				.requestNewAllocatedSlot(slotRequestId, slotProfile.getResourceProfile(), allocationTimeout)
				.thenApply((PhysicalSlot allocatedSlot) -> {
					try {
						return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
					} catch (FlinkException e) {
						throw new CompletionException(e);
					}
				});
		} else {
			// failed to allocate
			return FutureUtils.completedExceptionally(
				new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.'));
		}
	}

	private Optional<SlotAndLocality> tryAllocateFromAvailable(
		@Nonnull SlotRequestId slotRequestId,
		@Nonnull SlotProfile slotProfile) {

		Collection<SlotInfo> slotInfoList = slotPool.getAvailableSlotsInformation();

		Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
			slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);

		return selectedAvailableSlot.flatMap(slotInfoAndLocality -> {
			Optional<PhysicalSlot> optionalAllocatedSlot = slotPool.allocateAvailableSlot(
				slotRequestId,
				slotInfoAndLocality.getSlotInfo().getAllocationId());

			return optionalAllocatedSlot.map(
				allocatedSlot -> new SlotAndLocality(allocatedSlot, slotInfoAndLocality.getLocality()));
		});
	}
}

如果需要进行资源共享,那么还要进一步考虑 CoLocationGroup 强制约束的情况,它的核心就在于构造 TaskSlot 构成的树,然后在树上创建一个叶子节点,叶子节点里封装了需要的 LogicalSlot。更详细的流程参考下面代码和添加的注释:

class SchedulerImpl {
	private CompletableFuture<LogicalSlot> allocateSharedSlot(
		SlotRequestId slotRequestId,
		ScheduledUnit scheduledUnit,
		SlotProfile slotProfile,
		boolean allowQueuedScheduling,
		Time allocationTimeout) {
		//每一个 SlotSharingGroup 对应一个 SlotSharingManager
		// allocate slot with slot sharing
		final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
			scheduledUnit.getSlotSharingGroupId(),
			id -> new SlotSharingManager(
				id,
				slotPool,
				this));

		//分配 MultiTaskSlot
		final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
		try {
			if (scheduledUnit.getCoLocationConstraint() != null) {
				//存在 ColLocation 约束
				multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
					scheduledUnit.getCoLocationConstraint(),
					multiTaskSlotManager,
					slotProfile,
					allowQueuedScheduling,
					allocationTimeout);
			} else {
				multiTaskSlotLocality = allocateMultiTaskSlot(
					scheduledUnit.getJobVertexId(),
					multiTaskSlotManager,
					slotProfile,
					allowQueuedScheduling,
					allocationTimeout);
			}
		} catch (NoResourceAvailableException noResourceException) {
			return FutureUtils.completedExceptionally(noResourceException);
		}

		// sanity check
		Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()));

		//在 MultiTaskSlot 下创建叶子节点 SingleTaskSlot,并获取可以分配给任务的 SingleLogicalSlot
		final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
			slotRequestId,
			scheduledUnit.getJobVertexId(),
			multiTaskSlotLocality.getLocality());
		return leaf.getLogicalSlotFuture();
	}

	private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(
		CoLocationConstraint coLocationConstraint,
		SlotSharingManager multiTaskSlotManager,
		SlotProfile slotProfile,
		boolean allowQueuedScheduling,
		Time allocationTimeout) throws NoResourceAvailableException {
		//coLocationConstraint 会和分配给它的 MultiTaskSlot(不是root) 的 SlotRequestId 绑定
		//这个绑定关系只有在分配了 MultiTaskSlot 之后才会生成
		//可以根据 SlotRequestId 直接定位到 MultiTaskSlot
		final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();

		if (coLocationSlotRequestId != null) {
			// we have a slot assigned --> try to retrieve it
			final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);

			if (taskSlot != null) {
				Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
				return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) taskSlot), Locality.LOCAL);
			} else {
				// the slot may have been cancelled in the mean time
				coLocationConstraint.setSlotRequestId(null);
			}
		}

		if (coLocationConstraint.isAssigned()) {
			// refine the preferred locations of the slot profile
			slotProfile = new SlotProfile(
				slotProfile.getResourceProfile(),
				Collections.singleton(coLocationConstraint.getLocation()),
				slotProfile.getPreferredAllocations());
		}

		//为这个 coLocationConstraint 分配 MultiTaskSlot,先找到符合要求的root MultiTaskSlot
		// get a new multi task slot
		SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
			coLocationConstraint.getGroupId(),
			multiTaskSlotManager,
			slotProfile,
			allowQueuedScheduling,
			allocationTimeout);

		// check whether we fulfill the co-location constraint
		if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
			multiTaskSlotLocality.getMultiTaskSlot().release(
				new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));

			throw new NoResourceAvailableException("Could not allocate a local multi task slot for the " +
				"co location constraint " + coLocationConstraint + '.');
		}

		//在 root MultiTaskSlot 下面创建一个二级的 MultiTaskSlot,分配给这个 coLocationConstraint
		final SlotRequestId slotRequestId = new SlotRequestId();
		final SlotSharingManager.MultiTaskSlot coLocationSlot =
			multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
				slotRequestId,
				coLocationConstraint.getGroupId());

		//为 coLocationConstraint 绑定 slotRequestId,后续就可以直接通过这个 slotRequestId 定位到 MultiTaskSlot
		// mark the requested slot as co-located slot for other co-located tasks
		coLocationConstraint.setSlotRequestId(slotRequestId);

		// lock the co-location constraint once we have obtained the allocated slot
		coLocationSlot.getSlotContextFuture().whenComplete(
			(SlotContext slotContext, Throwable throwable) -> {
				if (throwable == null) {
					// check whether we are still assigned to the co-location constraint
					if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
						//为这个 coLocationConstraint 绑定位置
						coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
					} else {
						log.debug("Failed to lock colocation constraint {} because assigned slot " +
								"request {} differs from fulfilled slot request {}.",
							coLocationConstraint.getGroupId(),
							coLocationConstraint.getSlotRequestId(),
							slotRequestId);
					}
				} else {
					log.debug("Failed to lock colocation constraint {} because the slot " +
							"allocation for slot request {} failed.",
						coLocationConstraint.getGroupId(),
						coLocationConstraint.getSlotRequestId(),
						throwable);
				}
			});

		return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
	}

	private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
		AbstractID groupId,
		SlotSharingManager slotSharingManager,
		SlotProfile slotProfile,
		boolean allowQueuedScheduling,
		Time allocationTimeout) throws NoResourceAvailableException {

		//找到符合要求的已经分配了 AllocatedSlot 的 root MultiTaskSlot 集合,
		//这里的符合要求是指 root MultiTaskSlot 不含有当前 groupId, 避免把 groupId 相同(同一个 JobVertex)的不同 task 分配到同一个 slot 中
		Collection<SlotInfo> resolvedRootSlotsInfo = slotSharingManager.listResolvedRootSlotInfo(groupId);

		//由 slotSelectionStrategy 选出最符合条件的
		SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
			slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);

		//对 MultiTaskSlot 和 Locality 做一层封装
		final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = bestResolvedRootSlotWithLocality != null ?
			new SlotSharingManager.MultiTaskSlotLocality(
				slotSharingManager.getResolvedRootSlot(bestResolvedRootSlotWithLocality.getSlotInfo()),
				bestResolvedRootSlotWithLocality.getLocality()) :
			null;

		//如果 MultiTaskSlot 对应的 AllocatedSlot 和请求偏好的 slot 落在同一个 TaskManager,那么就选择这个 MultiTaskSlot
		if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
			return multiTaskSlotLocality;
		}

		//这里由两种可能:
		// 1)multiTaskSlotLocality == null,说明没有找到符合条件的 root MultiTaskSlot
		// 2) multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL,不符合 Locality 偏好

		//尝试从 SlotPool 中未使用的 slot 中选择
		final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
		final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();

		Optional<SlotAndLocality> optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);

		if (optionalPoolSlotAndLocality.isPresent()) {
			//如果从 SlotPool 中找到了未使用的 slot
			SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get();
			//如果未使用的 AllocatedSlot 符合 Locality 偏好,或者前一步没有找到可用的 MultiTaskSlot
			if (poolSlotAndLocality.getLocality() == Locality.LOCAL || bestResolvedRootSlotWithLocality == null) {

				//基于 新分配的 AllocatedSlot 创建一个 root MultiTaskSlot
				final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot();
				final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
					multiTaskSlotRequestId,
					CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()),
					allocatedSlotRequestId);

				//将新创建的 root MultiTaskSlot 作为 AllocatedSlot 的 payload
				if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
					return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, poolSlotAndLocality.getLocality());
				} else {
					multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
						allocatedSlot.getAllocationId() + '.'));
				}
			}
		}

		if (multiTaskSlotLocality != null) {
			//如果都不符合 Locality 偏好,或者 SlotPool 中没有可用的 slot 了
			// prefer slot sharing group slots over unused slots
			if (optionalPoolSlotAndLocality.isPresent()) {
				slotPool.releaseSlot(
					allocatedSlotRequestId,
					new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
			}
			return multiTaskSlotLocality;
		}

		//到这里,说明 1)slotSharingManager 中没有符合要求的 root MultiTaskSlot && 2)slotPool 中没有可用的 slot 了
		if (allowQueuedScheduling) {
			//先检查 slotSharingManager 中是不是还有没完成 slot 分配的  root MultiTaskSlot
			// there is no slot immediately available --> check first for uncompleted slots at the slot sharing group
			SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);

			if (multiTaskSlot == null) {
				//如果没有,就需要 slotPool 向 RM 请求新的 slot 了
				// it seems as if we have to request a new slot from the resource manager, this is always the last resort!!!
				final CompletableFuture<PhysicalSlot> slotAllocationFuture = slotPool.requestNewAllocatedSlot(
					allocatedSlotRequestId,
					slotProfile.getResourceProfile(),
					allocationTimeout);

				//请求分配后,就是同样的流程的,创建一个 root MultiTaskSlot,并作为新分配的 AllocatedSlot 的负载
				multiTaskSlot = slotSharingManager.createRootSlot(
					multiTaskSlotRequestId,
					slotAllocationFuture,
					allocatedSlotRequestId);

				slotAllocationFuture.whenComplete(
					(PhysicalSlot allocatedSlot, Throwable throwable) -> {
						final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);

						if (taskSlot != null) {
							// still valid
							if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
								taskSlot.release(throwable);
							} else {
								if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {
									taskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
										allocatedSlot.getAllocationId() + '.'));
								}
							}
						} else {
							slotPool.releaseSlot(
								allocatedSlotRequestId,
								new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.'));
						}
					});
			}

			return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
		}

		throw new NoResourceAvailableException("Could not allocate a shared slot for " + groupId + '.');
	}
}
小结

小结

在分布式计算引擎,一个非常核心的功能在于对计算资源的管理。在这篇文章里我们了解了 Flink 以 Task slot 为基本单位的资源管理模式,从 TaskExecutor, ResourceManager 到 JobMaster,对于每一个组件内部的 slot 管理方式都进行了较为细致的分解,包括 ResourceManager 的动态资源管理,SlotSharingGroup 和 CoLocationGroup 的共享约束等。动态资源管理机制使得 Flink 可以更为方便地和 Yarn、Kubernetes 等资源管理框架进行继承,而计算资源共享机制则使得作业内部子任务之间的网络开销大大降低。

最后,附上两张图以加深理解:

【Flink】Flink 计算资源管理
【Flink】Flink 计算资源管理

上一篇:【Flink】Flink 源码之时间处理


下一篇:flink双流JOIN原理