device plugin

设备插件(device plugin)

设备插件将扩展资源(除了内置的cpu和mem)上报到kubelet上,使容器能识别并访问这些扩展资源。

实现

使用kubernetes 提供的设备插件框架(device plugin framework )来实现。 设备插件主要由三部分构成:

  1. 注册:向kubelet发布扩展资源信息
  2. listwatch:向kubelet上报设备列表,及其健康状态。
  3. allocate:容器创建之前,分配设备。
device plugin

kubelet内部抽象的注册接口

type RegistrationServer interface {
	Register(context.Context, *RegisterRequest) (*Empty, error)
}

设备插件服务注册自身资源的请求参数

type RegisterRequest struct {
	// Version of the API the Device Plugin was built against
	Version      string 
	// plugin-server sock path 
	// /var/lib/kubelet/device-plugins/aliyungpushare.sock
	Endpoint     string 
	// Schedulable resource name
	// aliyun.com/gpu-mem
	ResourceName string
	// Options to be communicated with Device Manager
	Options      *DevicePluginOptions
}

设备插件服务需要实现下面服务端的接口。

// DevicePluginServer is the server API for DevicePlugin service.
type DevicePluginServer interface {

	GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error)

    // ListAndWatch 返回 Device 列表构成的数据流。
    // 当 Device 状态发生变化或者 Device 消失时,ListAndWatch
    // 会返回新的列表。
	ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error

	Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)

	PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}

请求及返回参数详情

type DevicePluginOptions struct {
	// Indicates if PreStartContainer call is required before each container start
	PreStartRequired     bool
}

type ContainerAllocateResponse struct {
	Envs           map[string]string 
	Mounts         []*Mount 
	Devices        []*DeviceSpec
	Annotations    map[string]string 
}

type ListAndWatchResponse struct {
	Devices []*Device 
}

// E.g:
// struct Device {
//    ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
//    State: "Healthy",
//}
type Device struct {
	ID string 
	Health string 
}

  

调用链路
device plugin

ContainerManager抽象接口

(只关注与设备插件管理器相关的接口)
// Manages the containers running on a machine.
type ContainerManager interface {
	// Runs the container manager's housekeeping.
	// - Ensures that the Docker daemon is in a container.
	// - Creates the system container where all non-containerized processes run.
	Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error

	...

	// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
	GetCapacity() v1.ResourceList

	// GetDevicePluginResourceCapacity returns the node capacity (amount of total device plugin resources),
	// node allocatable (amount of total healthy resources reported by device plugin),
	// and inactive device plugin resources previously registered on the node.
	GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string)

	...

	// GetResources returns RunContainerOptions with devices, mounts, and env fields populated for
	// extended resources required by container.
	GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)

	// UpdatePluginResources calls Allocate of device plugin handler for potential
	// requests for device plugin resources, and returns an error if fails.
	// Otherwise, it updates allocatableResource in nodeInfo if necessary,
	// to make sure it is at least equal to the pod's requested capacity for
	// any registered device plugin resource
	UpdatePluginResources(*schedulernodeinfo.NodeInfo, *lifecycle.PodAdmitAttributes) error

	...

	// GetDevices returns information about the devices assigned to pods and containers
	GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices

	...
}

在kubelet 初始化pod之前的admit handler 就是UpdatePluginResources。

func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
	return cm.deviceManager.Allocate(node, attrs)
}

Device Plugins manager 抽象接口

// Manager manages all the Device Plugins running on a node.
type Manager interface {
	// Start starts device plugin registration service.
	Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error

	// Allocate configures and assigns devices to pods. The pods are provided
	// through the pod admission attributes in the attrs argument. From the
	// requested device resources, Allocate will communicate with the owning
	// device plugin to allow setup procedures to take place, and for the
	// device plugin to provide runtime settings to use the device (environment
	// variables, mount points and device files). The node object is provided
	// for the device manager to update the node capacity to reflect the
	// currently available devices.
	Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error

	// Stop stops the manager.
	Stop() error

	// GetDeviceRunContainerOptions checks whether we have cached containerDevices
	// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
	// for the found one. An empty struct is returned in case no cached state is found.
	GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)

	// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
	// and inactive device plugin resources previously registered on the node.
	GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
	GetWatcherHandler() cache.PluginHandler

	// GetDevices returns information about the devices assigned to pods and containers
	GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices

	// ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not,
	// depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates
	// the node has been recreated.
	ShouldResetExtendedResourceCapacity() bool

	// TopologyManager HintProvider provider indicates the Device Manager implements the Topology Manager Interface
	// and is consulted to make Topology aware resource alignments
	GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint
}

endpoint 抽象

type endpoint interface {
	run()
	stop()
	allocate(devs []string) (*pluginapi.AllocateResponse, error)
	preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
	callback(resourceName string, devices []pluginapi.Device)
	isStopped() bool
	stopGracePeriodExpired() bool
}

扩展资源要被容器使用,除了实现自定义设备插件来管理扩展资源,还需要实现调度器扩展,以下是涉及到整个流程

device plugin

 

上一篇:Unity实现鼠标长按对象打开麦克风,鼠标松开关闭麦克风(IPointerDownHandler,IPointerUpHandler)


下一篇:基于websocket的跨平台通信——iPhone/iPad/Mac控制树莓派(一):Springboot后端搭建