设备插件(device plugin)
设备插件将扩展资源(除了内置的cpu和mem)上报到kubelet上,使容器能识别并访问这些扩展资源。
实现
使用kubernetes 提供的设备插件框架(device plugin framework )来实现。 设备插件主要由三部分构成:
- 注册:向kubelet发布扩展资源信息
- listwatch:向kubelet上报设备列表,及其健康状态。
- allocate:容器创建之前,分配设备。
kubelet内部抽象的注册接口
type RegistrationServer interface { Register(context.Context, *RegisterRequest) (*Empty, error) }
设备插件服务注册自身资源的请求参数
type RegisterRequest struct { // Version of the API the Device Plugin was built against Version string // plugin-server sock path // /var/lib/kubelet/device-plugins/aliyungpushare.sock Endpoint string // Schedulable resource name // aliyun.com/gpu-mem ResourceName string // Options to be communicated with Device Manager Options *DevicePluginOptions }
设备插件服务需要实现下面服务端的接口。
// DevicePluginServer is the server API for DevicePlugin service. type DevicePluginServer interface { GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error) // ListAndWatch 返回 Device 列表构成的数据流。 // 当 Device 状态发生变化或者 Device 消失时,ListAndWatch // 会返回新的列表。 ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error) PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error) }
请求及返回参数详情
type DevicePluginOptions struct { // Indicates if PreStartContainer call is required before each container start PreStartRequired bool } type ContainerAllocateResponse struct { Envs map[string]string Mounts []*Mount Devices []*DeviceSpec Annotations map[string]string } type ListAndWatchResponse struct { Devices []*Device } // E.g: // struct Device { // ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", // State: "Healthy", //} type Device struct { ID string Health string }
调用链路
ContainerManager抽象接口
(只关注与设备插件管理器相关的接口)
// Manages the containers running on a machine. type ContainerManager interface { // Runs the container manager's housekeeping. // - Ensures that the Docker daemon is in a container. // - Creates the system container where all non-containerized processes run. Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error ... // GetCapacity returns the amount of compute resources tracked by container manager available on the node. GetCapacity() v1.ResourceList // GetDevicePluginResourceCapacity returns the node capacity (amount of total device plugin resources), // node allocatable (amount of total healthy resources reported by device plugin), // and inactive device plugin resources previously registered on the node. GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) ... // GetResources returns RunContainerOptions with devices, mounts, and env fields populated for // extended resources required by container. GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) // UpdatePluginResources calls Allocate of device plugin handler for potential // requests for device plugin resources, and returns an error if fails. // Otherwise, it updates allocatableResource in nodeInfo if necessary, // to make sure it is at least equal to the pod's requested capacity for // any registered device plugin resource UpdatePluginResources(*schedulernodeinfo.NodeInfo, *lifecycle.PodAdmitAttributes) error ... // GetDevices returns information about the devices assigned to pods and containers GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices ... }
在kubelet 初始化pod之前的admit handler 就是UpdatePluginResources。
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return cm.deviceManager.Allocate(node, attrs) }
Device Plugins manager 抽象接口
// Manager manages all the Device Plugins running on a node. type Manager interface { // Start starts device plugin registration service. Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error // Allocate configures and assigns devices to pods. The pods are provided // through the pod admission attributes in the attrs argument. From the // requested device resources, Allocate will communicate with the owning // device plugin to allow setup procedures to take place, and for the // device plugin to provide runtime settings to use the device (environment // variables, mount points and device files). The node object is provided // for the device manager to update the node capacity to reflect the // currently available devices. Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error // Stop stops the manager. Stop() error // GetDeviceRunContainerOptions checks whether we have cached containerDevices // for the passed-in <pod, container> and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // and inactive device plugin resources previously registered on the node. GetCapacity() (v1.ResourceList, v1.ResourceList, []string) GetWatcherHandler() cache.PluginHandler // GetDevices returns information about the devices assigned to pods and containers GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices // ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not, // depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates // the node has been recreated. ShouldResetExtendedResourceCapacity() bool // TopologyManager HintProvider provider indicates the Device Manager implements the Topology Manager Interface // and is consulted to make Topology aware resource alignments GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint }
endpoint 抽象
type endpoint interface { run() stop() allocate(devs []string) (*pluginapi.AllocateResponse, error) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) callback(resourceName string, devices []pluginapi.Device) isStopped() bool stopGracePeriodExpired() bool }
扩展资源要被容器使用,除了实现自定义设备插件来管理扩展资源,还需要实现调度器扩展,以下是涉及到整个流程