关联博客:kubernetes/k8s CSI分析-容器存储接口分析
概述
kubernetes的设计初衷是支持可插拔架构,从而利于扩展kubernetes
的功能。在此架构思想下,kubernetes
提供了3个特定功能的接口,分别是容器网络接口CNI
、容器运行时接口CRI
和容器存储接口CSI
。kubernetes
通过调用这几个接口,来完成相应的功能。
下面我们来对容器运行时接口CRI
来做一下介绍与分析。
在本文中,会对CRI
是什么、为什么要有CRI
、CRI
系统架构做一下介绍,以及k8s
对CRI
进行相关操作的流程分析,包括了pod创建、删除等操作。
CRI是什么
CRI是Container Runtime Interface
(容器运行时接口)的简写。
CRI解耦了kubelet与容器运行时,让kubelet无需重新编译就可以支持多种容器运行时。
kubelet将通过CRI
接口来跟第三方容器运行时进行通信,来操作容器与镜像。
实现了 CRI 接口的容器运行时通常称为 CRI shim, 这是一个 gRPC Server,监听在本地的 unix socket 上;而 kubelet 作为 gRPC 的客户端来调用 CRI 接口,来进行Pod 和容器、镜像的生命周期管理。另外,容器运行时需要自己负责管理容器的网络,推荐使用 CNI。
图1:CRI shim通信图
提出了CRI标准以后,意味着在新的版本里需要使用新的连接方式与docker通信,为了兼容以前的版本,k8s提供了针对docker的CRI实现,也就是kubelet包下的dockershim
包,dockershim
是一个grpc服务,监听一个端口供kubelet连接,dockershim
收到kubelet的请求后,将其转化为REST API请求,再发送给docker daemon
。
图2:dockershim通信图
为什么要有CRI
在1.5以前的版本中,k8s依赖于docker,为了支持不同的容器运行时,如rkt
、containerd
等,kubelet从1.5开始加入了CRI标准,它将 Kubelet 与容器运行时解耦,将原来完全面向 Pod 级别的内部接口拆分成面向 Sandbox
和 Container
的 gRPC 接口,并将镜像管理和容器管理分离到不同的服务,方便后续其他容器运行时与k8s对接。
Kubernetes中的容器运行时组成
按照不同的功能可以分为四个部分:
(1)kubelet 中容器运行时的管理,kubeGenericRuntimeManager
,它管理与CRI shim通信的客户端,完成容器和镜像的管理(代码位置:pkg/kubelet/kuberuntime/kuberuntime_manager.go
);
(2)容器运行时接口CRI,包括了容器运行时客户端接口与容器运行时服务端接口;
(3)CRI shim客户端,kubelet持有,用于与CRI shim服务端进行通信;
(4)CRI shim服务端,即具体的容器运行时实现,包括 kubelet 内置的 dockershim
(代码位置:pkg/kubelet/dockershim
)以及外部的容器运行时如 cri-containerd
(用于支持容器引擎containerd
)、rktlet
(用于支持容器引擎rkt
)等。
CRI架构图
在 CRI 之下,包括两种类型的容器运行时的实现:
(1)kubelet内置的 dockershim
,实现了 Docker 容器引擎的支持以及 CNI 网络插件(包括 kubenet)的支持。dockershim
代码内置于kubelet,被kubelet调用,让dockershim
起独立的server来建立CRI shim,向kubelet暴露grpc server;
(2)外部的容器运行时,用来支持 rkt
、containerd
等容器引擎的外部容器运行时。
kubelet中CRI相关的源码分析
kubelet的CRI源码分析包括如下几部分:
(1)kubelet CRI相关启动参数分析;
(2)kubelet CRI相关interface/struct分析;
(3)kubelet CRI初始化分析;
(4)kubelet调用CRI创建pod分析;
(5)kubelet调用CRI删除pod分析。
因篇幅原因,本篇博文先对前三部分做分析,下一篇博文再对CRI创建pod以及CRI删除pod做分析。
基于tag v1.17.4
https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4
1.kubelet组件CRI相关启动参数分析
kubelet组件CRI相关启动参数相关代码如下:
// pkg/kubelet/config/flags.go
// AddFlags adds flags to the container runtime, according to ContainerRuntimeOptions.
func (s *ContainerRuntimeOptions) AddFlags(fs *pflag.FlagSet) {
dockerOnlyWarning := "This docker-specific flag only works when container-runtime is set to docker."
// General settings.
fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: ‘docker‘, ‘remote‘, ‘rkt (deprecated)‘.")
fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.")
fs.BoolVar(&s.RedirectContainerStreaming, "redirect-container-streaming", s.RedirectContainerStreaming, "Enables container streaming redirect. If false, kubelet will proxy container streaming data between apiserver and container runtime; if true, kubelet will return an http redirect to apiserver, and apiserver will access container runtime directly. The proxy approach is more secure, but introduces some overhead. The redirect approach is more performant, but less secure because the connection between apiserver and container runtime may not be authenticated.")
// Docker-specific settings.
fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]")
fs.MarkHidden("experimental-dockershim")
fs.StringVar(&s.DockershimRootDirectory, "experimental-dockershim-root-directory", s.DockershimRootDirectory, "Path to the dockershim root directory.")
fs.MarkHidden("experimental-dockershim-root-directory")
fs.StringVar(&s.PodSandboxImage, "pod-infra-container-image", s.PodSandboxImage, fmt.Sprintf("The image whose network/ipc namespaces containers in each pod will use. %s", dockerOnlyWarning))
fs.StringVar(&s.DockerEndpoint, "docker-endpoint", s.DockerEndpoint, fmt.Sprintf("Use this for the docker endpoint to communicate with. %s", dockerOnlyWarning))
fs.DurationVar(&s.ImagePullProgressDeadline.Duration, "image-pull-progress-deadline", s.ImagePullProgressDeadline.Duration, fmt.Sprintf("If no pulling progress is made before this deadline, the image pulling will be cancelled. %s", dockerOnlyWarning))
...
}
// cmd/kubelet/app/options/options.go
// AddFlags adds flags for a specific KubeletFlags to the specified FlagSet
func (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) {
...
fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket endpoint is supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:‘unix:///var/run/dockershim.sock‘, ‘npipe:////./pipe/dockershim‘")
fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket endpoint is supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:‘unix:///var/run/dockershim.sock‘, ‘npipe:////./pipe/dockershim‘")
...
}
kubelet组件启动参数的默认值在NewKubeletFlags
函数中设置。
// cmd/kubelet/app/options/options.go
// NewKubeletFlags will create a new KubeletFlags with default values
func NewKubeletFlags() *KubeletFlags {
remoteRuntimeEndpoint := ""
if runtime.GOOS == "linux" {
remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock"
} else if runtime.GOOS == "windows" {
remoteRuntimeEndpoint = "npipe:////./pipe/dockershim"
}
return &KubeletFlags{
EnableServer: true,
ContainerRuntimeOptions: *NewContainerRuntimeOptions(),
CertDirectory: "/var/lib/kubelet/pki",
RootDirectory: defaultRootDir,
MasterServiceNamespace: metav1.NamespaceDefault,
MaxContainerCount: -1,
MaxPerPodContainerCount: 1,
MinimumGCAge: metav1.Duration{Duration: 0},
NonMasqueradeCIDR: "10.0.0.0/8",
RegisterSchedulable: true,
ExperimentalKernelMemcgNotification: false,
RemoteRuntimeEndpoint: remoteRuntimeEndpoint,
NodeLabels: make(map[string]string),
VolumePluginDir: "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/",
RegisterNode: true,
SeccompProfileRoot: filepath.Join(defaultRootDir, "seccomp"),
// prior to the introduction of this flag, there was a hardcoded cap of 50 images
NodeStatusMaxImages: 50,
EnableCAdvisorJSONEndpoints: true,
}
}
CRI相关启动参数的默认值在NewContainerRuntimeOptions
和NewMainKubelet
函数中设置。
// cmd/kubelet/app/options/container_runtime.go
// NewContainerRuntimeOptions will create a new ContainerRuntimeOptions with
// default values.
func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions {
dockerEndpoint := ""
if runtime.GOOS != "windows" {
dockerEndpoint = "unix:///var/run/docker.sock"
}
return &config.ContainerRuntimeOptions{
ContainerRuntime: kubetypes.DockerContainerRuntime,
RedirectContainerStreaming: false,
DockerEndpoint: dockerEndpoint,
DockershimRootDirectory: "/var/lib/dockershim",
PodSandboxImage: defaultPodSandboxImage,
ImagePullProgressDeadline: metav1.Duration{Duration: 1 * time.Minute},
ExperimentalDockershim: false,
//Alpha feature
CNIBinDir: "/opt/cni/bin",
CNIConfDir: "/etc/cni/net.d",
CNICacheDir: "/var/lib/cni/cache",
}
}
// pkg/kubelet/kubelet.go
func NewMainKubelet(...) {
...
if remoteRuntimeEndpoint != "" {
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
if remoteImageEndpoint == "" {
remoteImageEndpoint = remoteRuntimeEndpoint
}
}
...
}
下面来简单分析几个比较重要的CRI相关启动参数:
(1)--container-runtime
:指定kubelet要使用的容器运行时,可选值docker
、remote
、rkt (deprecated)
,默认值为docker
,即使用kubelet内置的容器运行时dockershim
。当需要使用外部容器运行时,该参数配置为remote
,并设置--container-runtime-endpoint
参数值为监听的 unix socket
位置。
(2)--runtime-cgroups
:容器运行时使用的cgroups,可选值。
(3)--docker-endpoint
:docker暴露服务的socket地址,默认值为unix:///var/run/docker.sock
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(4)--pod-infra-container-image
:pod sandbox的镜像地址,默认值为k8s.gcr.io/pause:3.1
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(5)--image-pull-progress-deadline
:容器镜像拉取超时时间,默认值为1分钟,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(6)--experimental-dockershim
:设置为true
时,启用dockershim
模式,只启动dockershim,默认值为false
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(7)--experimental-dockershim-root-directory
:dockershim
根目录,默认值为/var/lib/dockershim
,该参数配置当且仅当--container-runtime
参数值为docker
时有效。
(8)--container-runtime-endpoint
:容器运行时的endpoint,linux中默认值为unix:///var/run/dockershim.sock
,注意与上面的--docker-endpoint
区分开来。
(9)--image-service-endpoint
:镜像服务的endpointlinux中默认值为unix:///var/run/dockershim.sock
。
2.kubelet CRI相关interface/struct分析
CRI相关接口
(1)RuntimeService interface
:CRI shim客户端-容器运行时接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/services.go
(2)ImageManagerService interface
:CRI shim客户端-容器镜像接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/services.go
(3)RuntimeServiceServer interface
:CRI shim服务端-容器运行时接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go
(4)ImageServiceServer interface
:CRI shim服务端-容器镜像接口;
代码位置:staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go
(5)CRIService interface
:包括了RuntimeServiceServer interface
、ImageServiceServer interface
与CRI shim服务端启动方法,所以其包括了一个CRI shim服务端需要实现的所有接口方法;
代码位置:pkg/kubelet/dockershim/docker_service.go
(6)DockerService interface
:包括了CRIService interface
。
代码位置:pkg/kubelet/dockershim/docker_service.go
说明:RuntimeService interface
与RuntimeServiceServer interface
、ImageManagerService interface
与ImageServiceServer interface
中的接口方法是相同的,它们之间的区别只是一个用于CRI shim客户端,一个用于CRI shim服务端。容器运行时接口负责管理 Pod 和容器的生命周期,容器镜像接口负责管理容器镜像的生命周期。
CRI相关结构体
(1)RemoteRuntimeService struct
:实现了CRI shim客户端-容器运行时接口RuntimeService interface
,持有与CRI shim容器运行时服务端通信的客户端;
代码位置:pkg/kubelet/remote/remote_runtime.go
(2)RemoteImageService struct
:实现了CRI shim客户端-容器镜像接口ImageManagerService interface
,持有与CRI shim容器镜像服务端通信的客户端;
代码位置:pkg/kubelet/remote/remote_image.go
(3)dockerService struct
:实现了CRI shim服务端-容器运行时接口RuntimeServiceServer interface
;
代码位置:pkg/kubelet/dockershim/docker_service.go
、pkg/kubelet/dockershim/docker_container.go
(4)dockerService struct
:实现了CRI shim服务端-容器镜像接口ImageServiceServer interface
;
代码位置:pkg/kubelet/dockershim/docker_service.go
、pkg/kubelet/dockershim/docker_image.go
(5)DockerServer struct
:代表了dockershim(kubelet内置的CRI shim)的服务端,其实现了CRIService interface
。
代码位置:pkg/kubelet/dockershim/remote/docker_server.go
CRI shim server接口图示
RuntimeServiceServer
RuntimeServiceServer 提供了的接口,按照功能可以划分为四组:
(1)PodSandbox 的管理接口:PodSandbox 是对 Kubernete Pod 的抽象,用来给容器提供一个隔离的环境,并提供网络等共享的命名空间;
(2)Container 的管理接口:在指定的 PodSandbox 中创建、启动、停止和删除容器;
(3)Streaming API 接口:包括 Exec、Attach 和 PortForward 等和容器进行数据交互的接口,这三个接口返回的是运行时 Streaming Server 的 URL,而不是直接跟容器交互;
(4)runtime状态接口:包括查询 runtime名称、版本、API 版本和状态等。
ImageServiceServer
ImageServiceServer提供了 5 个接口,用于管理容器镜像。
下面会对上面提到的接口/结构体做分析。
2.1 RuntimeService interface
RuntimeService 负责管理 Pod 和容器的生命周期,是CRI shim客户端需要实现的容器运行时接口。
RuntimeService interface包含了RuntimeVersioner
、ContainerManager
、PodSandboxManager
与ContainerStatsManager
接口,下面对对这些接口一一做介绍。
容器运行时会实现RuntimeService interface
。
// staging/src/k8s.io/cri-api/pkg/apis/services.go
// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {
RuntimeVersioner
ContainerManager
PodSandboxManager
ContainerStatsManager
// UpdateRuntimeConfig updates runtime configuration if specified
UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
// Status returns the status of the runtime.
Status() (*runtimeapi.RuntimeStatus, error)
}
RuntimeVersioner interface
RuntimeVersioner interface负责返回容器运行时的名称、版本以及 API 版本信息,只有一个接口函数 Version
。
// staging/src/k8s.io/cri-api/pkg/apis/services.go
// RuntimeVersioner contains methods for runtime name, version and API version.
type RuntimeVersioner interface {
// Version returns the runtime name, runtime version and runtime API version
Version(apiVersion string) (*runtimeapi.VersionResponse, error)
}
ContainerManager interface
ContainerManager interface包含了对container
(业务容器)进行操作的一些方法,如CreateContainer
(创建容器)、StartContainer
(启动容器)、StopContainer
(停止容器)、RemoveContainer
(删除容器)等。
// staging/src/k8s.io/cri-api/pkg/apis/services.go
// ContainerManager contains methods to manipulate containers managed by a
// container runtime. The methods are thread-safe.
type ContainerManager interface {
// CreateContainer creates a new container in specified PodSandbox.
CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
// StartContainer starts the container.
StartContainer(containerID string) error
// StopContainer stops a running container with a grace period (i.e., timeout).
StopContainer(containerID string, timeout int64) error
// RemoveContainer removes the container.
RemoveContainer(containerID string) error
// ListContainers lists all containers by filters.
ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)
// ContainerStatus returns the status of the container.
ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error)
// UpdateContainerResources updates the cgroup resources for the container.
UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Exec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. If it returns error, new container log file MUST NOT
// be created.
ReopenContainerLog(ContainerID string) error
}
PodSandboxManager interface
PodSandboxManager interface包含了对pod sandbox
(pause container
)进行操作的一些方法,如RunPodSandbox
(创建并启动pause container
)、StopPodSandbox
(停止pause container
)、RemovePodSandbox
(删除pause container
)等。
// staging/src/k8s.io/cri-api/pkg/apis/services.go
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be force terminated.
StopPodSandbox(podSandboxID string) error
// RemovePodSandbox removes the sandbox. If there are running containers in the
// sandbox, they should be forcibly removed.
RemovePodSandbox(podSandboxID string) error
// PodSandboxStatus returns the Status of the PodSandbox.
PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
// ListPodSandbox returns a list of Sandbox.
ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}
ContainerStatsManager interface
ContainerStatsManager interface包含了对容器统计数据的查询接口,如ContainerStats
、ListContainerStats
。
// staging/src/k8s.io/cri-api/pkg/apis/services.go
// ContainerStatsManager contains methods for retrieving the container
// statistics.
type ContainerStatsManager interface {
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
ContainerStats(containerID string) (*runtimeapi.ContainerStats, error)
// ListContainerStats returns stats of all running containers.
ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error)
}
2.2 ImageManagerService interface
ImageManagerService负责管理镜像的生命周期,是CRI shim客户端需要实现的镜像接口。
ImageManagerService interface包含了容器镜像的相关操作接口,如PullImage
(拉取镜像)、ListImages
(列出现存镜像列表)等。
// staging/src/k8s.io/cri-api/pkg/apis/services.go
// ImageManagerService interface should be implemented by a container image
// manager.
// The methods should be thread-safe.
type ImageManagerService interface {
// ListImages lists the existing images.
ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)
// ImageStatus returns the status of the image.
ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error)
// PullImage pulls an image with the authentication config.
PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
// RemoveImage removes the image.
RemoveImage(image *runtimeapi.ImageSpec) error
// ImageFsInfo returns information of the filesystem that is used to store images.
ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error)
}
2.3 CRIService interface / DockerService interface
CRIService interface中定义了CRI shim服务端必须实现的一些方法,其中包括了RuntimeServiceServer interface
(容器运行时操作相关方法)、ImageServiceServer interface
(镜像操作相关方法)以及CRI shim服务端启动方法。
// pkg/kubelet/dockershim/docker_service.go
// CRIService includes all methods necessary for a CRI server.
type CRIService interface {
runtimeapi.RuntimeServiceServer
runtimeapi.ImageServiceServer
Start() error
}
// DockerService is an interface that embeds the new RuntimeService and
// ImageService interfaces.
type DockerService interface {
CRIService
// For serving streaming calls.
http.Handler
// For supporting legacy features.
DockerLegacyService
}
2.4 RemoteRuntimeService struct
实现了CRI shim客户端-容器运行时接口RuntimeService interface
,持有与CRI shim容器运行时服务端通信的客户端runtimeClient
。
// pkg/kubelet/remote/remote_runtime.go
// RemoteRuntimeService is a gRPC implementation of internalapi.RuntimeService.
type RemoteRuntimeService struct {
timeout time.Duration
runtimeClient runtimeapi.RuntimeServiceClient
// Cache last per-container error message to reduce log spam
logReduction *logreduction.LogReduction
}
2.5 RemoteImageService struct
实现了CRI shim客户端-容器镜像接口ImageManagerService interface
,持有与CRI shim容器镜像服务端通信的客户端imageClient
。
// pkg/kubelet/remote/remote_image.go
// RemoteImageService is a gRPC implementation of internalapi.ImageManagerService.
type RemoteImageService struct {
timeout time.Duration
imageClient runtimeapi.ImageServiceClient
}
2.5 DockerServer struct
DockerServer struct代表了dockershim(kubelet内置的CRI shim)的服务端,其实现了CRIService interface
。
// pkg/kubelet/dockershim/remote/docker_server.go
// DockerServer is the grpc server of dockershim.
type DockerServer struct {
// endpoint is the endpoint to serve on.
endpoint string
// service is the docker service which implements runtime and image services.
service dockershim.CRIService
// server is the grpc server.
server *grpc.Server
}
3.kubelet CRI相关初始化
kubelet中CRI相关初始化逻辑如下:
(1)当kubelet选用dockershim作为容器运行时,则初始化并启动容器运行时服务端dockershim(初始化dockershim过程中也会初始化网络插件CNI);
(2)初始化容器运行时CRI shim客户端(用于调用CRI shim服务端:内置的容器运行时dockershim或remote容器运行时);
(3)初始化kubeGenericRuntimeManager
,用于容器运行时的管理。初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成。
CRI初始化的调用链
main (cmd/kubelet/kubelet.go)
-> NewKubeletCommand (cmd/kubelet/app/server.go)
-> Run (cmd/kubelet/app/server.go)
-> run (cmd/kubelet/app/server.go)
-> RunKubelet (cmd/kubelet/app/server.go)
-> CreateAndInitKubelet(cmd/kubelet/app/server.go)
-> kubelet.NewMainKubelet(pkg/kubelet/kubelet.go)
-> getRuntimeAndImageServices(pkg/kubelet/kubelet.go) && kuberuntime.NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime_manager.go)
NewMainKubelet函数中CRI相关逻辑:
(1)初始化并启动内置容器运行时服务端dockershim:根据containerRuntime
的值(kubelet启动参数--container-runtime
),如果是docker
,则初始化并启动docker CRI shim
即kubelet内置容器运行时dockershim
,暴露grpc socket
,如果是remote
,则不做初始化启动操作。
(2)调用getRuntimeAndImageServices
:初始化容器运行时CRI shim客户端,包括容器运行时客户端runtimeClient
以及容器镜像客户端imageClient
。
(3)调用kuberuntime.NewKubeGenericRuntimeManager
,以及klet
赋值:初始化kubeGenericRuntimeManager struct
,用于容器运行时的管理。初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成。
// pkg/kubelet/kubelet.go
func NewMainKubelet(...) {
...
switch containerRuntime {
// (1)初始化并启动内置容器运行时服务端dockershim
case kubetypes.DockerContainerRuntime:
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
if err != nil {
return nil, err
}
if crOptions.RedirectContainerStreaming {
klet.criHandler = ds
}
// The unix socket for kubelet <-> dockershim communication.
klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
remoteRuntimeEndpoint,
remoteImageEndpoint)
klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
// Create dockerLegacyService when the logging driver is not supported.
supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return nil, err
}
if !supported {
klet.dockerLegacyService = ds
legacyLogProvider = ds
}
case kubetypes.RemoteContainerRuntime:
// No-op.
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
// (2)初始化容器运行时CRI shim客户端
runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
if err != nil {
return nil, err
}
klet.runtimeService = runtimeService
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}
// (3)初始化```GenericRuntimeManager```,用于容器运行时的管理
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.startupManager,
seccompProfileRoot,
containerRefManager,
machineInfo,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
runtimeService,
imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider,
klet.runtimeClassManager,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
...
}
3.1 初始化并启动内置容器运行时服务端dockershim
这里对变量containerRuntime
值等于docker
时做分析,即kubelet启动参数--container-runtime
值为docker
,这时kubelet会使用内置的CRI shim即dockershim作为容器运行时,dockershim调用docker进行容器以及镜像的相关操作。
初始化并启动dockershim
主要逻辑如下:
(1)调用dockershim.NewDockerService
:新建并初始化dockershim
服务端,包括初始化docker client、初始化cni网络配置等操作;
(2)调用dockerremote.NewDockerServer
与server.Start
:启动dockershim
,暴露服务socket。
3.1.1 dockershim.NewDockerService
新建并初始化dockershim
服务端,主要逻辑如下:
(1)调用NewDockerClientFromConfig
:创建docker的客户端-client对象,包含了我们常用的docker run,docker images等所有操作调用;
(2)构建dockerService struct
;
(2)初始化CNI网络配置(CNI网络配置初始化在专门进行CNI分析的博文再详细讲解)。
// pkg/kubelet/dockershim/docker_service.go
// NewDockerService creates a new `DockerService` struct.
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, pluginSettings *NetworkPluginSettings,
cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, startLocalStreamingServer bool) (DockerService, error) {
// (1)创建docker的客户端
client := NewDockerClientFromConfig(config)
c := libdocker.NewInstrumentedInterface(client)
checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir))
if err != nil {
return nil, err
}
// (2)构建```dockerService struct```
ds := &dockerService{
client: c,
os: kubecontainer.RealOS{},
podSandboxImage: podSandboxImage,
streamingRuntime: &streamingRuntime{
client: client,
execHandler: &NativeExecHandler{},
},
containerManager: cm.NewContainerManager(cgroupsName, client),
checkpointManager: checkpointManager,
startLocalStreamingServer: startLocalStreamingServer,
networkReady: make(map[string]bool),
containerCleanupInfos: make(map[string]*containerCleanupInfo),
}
// check docker version compatibility.
if err = ds.checkVersionCompatibility(); err != nil {
return nil, err
}
// create streaming server if configured.
if streamingConfig != nil {
var err error
ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
if err != nil {
return nil, err
}
}
// Determine the hairpin mode.
if err := effectiveHairpinMode(pluginSettings); err != nil {
// This is a non-recoverable error. Returning it up the callstack will just
// lead to retries of the same failure, so just fail hard.
return nil, err
}
klog.Infof("Hairpin mode set to %q", pluginSettings.HairpinMode)
// (3)初始化CNI网络配置
// dockershim currently only supports CNI plugins.
pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString)
cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginCacheDir, pluginSettings.PluginBinDirs)
cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs, pluginSettings.PluginCacheDir))
netHost := &dockerNetworkHost{
&namespaceGetter{ds},
&portMappingGetter{ds},
}
plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU)
if err != nil {
return nil, fmt.Errorf("didn‘t find compatible CNI plugin with given settings %+v: %v", pluginSettings, err)
}
ds.network = network.NewPluginManager(plug)
klog.Infof("Docker cri networking managed by %v", plug.Name())
// NOTE: cgroup driver is only detectable in docker 1.11+
cgroupDriver := defaultCgroupDriver
dockerInfo, err := ds.client.Info()
klog.Infof("Docker Info: %+v", dockerInfo)
if err != nil {
klog.Errorf("Failed to execute Info() call to the Docker client: %v", err)
klog.Warningf("Falling back to use the default driver: %q", cgroupDriver)
} else if len(dockerInfo.CgroupDriver) == 0 {
klog.Warningf("No cgroup driver is set in Docker")
klog.Warningf("Falling back to use the default driver: %q", cgroupDriver)
} else {
cgroupDriver = dockerInfo.CgroupDriver
}
if len(kubeCgroupDriver) != 0 && kubeCgroupDriver != cgroupDriver {
return nil, fmt.Errorf("misconfiguration: kubelet cgroup driver: %q is different from docker cgroup driver: %q", kubeCgroupDriver, cgroupDriver)
}
klog.Infof("Setting cgroupDriver to %s", cgroupDriver)
ds.cgroupDriver = cgroupDriver
ds.versionCache = cache.NewObjectCache(
func() (interface{}, error) {
return ds.getDockerVersion()
},
versionCacheTTL,
)
// Register prometheus metrics.
metrics.Register()
return ds, nil
}
NewDockerClientFromConfig
NewDockerClientFromConfig函数主要是建立与docker通信的客户端。其中config
结构体里,dockerEndpoint
的值来自于kubelet启动参数--container-runtime-endpoint
的配置,默认是unix:///var/run/docker.sock
。
// pkg/kubelet/dockershim/docker_service.go
// NewDockerClientFromConfig create a docker client from given configure
// return nil if nil configure is given.
func NewDockerClientFromConfig(config *ClientConfig) libdocker.Interface {
if config != nil {
// Create docker client.
client := libdocker.ConnectToDockerOrDie(
config.DockerEndpoint,
config.RuntimeRequestTimeout,
config.ImagePullProgressDeadline,
config.WithTraceDisabled,
config.EnableSleep,
)
return client
}
return nil
}
// pkg/kubelet/dockershim/libdocker/client.go
// ConnectToDockerOrDie creates docker client connecting to docker daemon.
// If the endpoint passed in is "fake://", a fake docker client
// will be returned. The program exits if error occurs. The requestTimeout
// is the timeout for docker requests. If timeout is exceeded, the request
// will be cancelled and throw out an error. If requestTimeout is 0, a default
// value will be applied.
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout, imagePullProgressDeadline time.Duration,
withTraceDisabled bool, enableSleep bool) Interface {
if dockerEndpoint == FakeDockerEndpoint {
fakeClient := NewFakeDockerClient()
if withTraceDisabled {
fakeClient = fakeClient.WithTraceDisabled()
}
if enableSleep {
fakeClient.EnableSleep = true
}
return fakeClient
}
client, err := getDockerClient(dockerEndpoint)
if err != nil {
klog.Fatalf("Couldn‘t connect to docker: %v", err)
}
klog.Infof("Start docker client with request timeout=%v", requestTimeout)
return newKubeDockerClient(client, requestTimeout, imagePullProgressDeadline)
}
3.1.2 启动dockershim,暴露服务socket。
dockerremote.NewDockerServer()
// pkg/kubelet/dockershim/remote/docker_server.go
// NewDockerServer creates the dockershim grpc server.
func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer {
return &DockerServer{
endpoint: endpoint,
service: s,
}
}
3.2 初始化容器运行时CRI shim客户端
getRuntimeAndImageServices函数主要逻辑:
(1)调用remote.NewRemoteRuntimeService
函数:实例化容器相关操作的CRI shim客户端-容器运行时客户端runtimeClient
,实现了上述CRI相关interface/struct分析中的RuntimeService接口(CRI shim客户端接口);
(2)调用remote.NewRemoteImageService
函数:实例化镜像相关操作的CRI shim客户端-容器镜像客户端imageClient
,实现了上述CRI相关interface/struct分析中的ImageManagerService接口(CRI shim客户端接口)。
// pkg/kubelet/kubelet.go
func getRuntimeAndImageServices(remoteRuntimeEndpoint string, remoteImageEndpoint string, runtimeRequestTimeout metav1.Duration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
rs, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
is, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
return rs, is, err
}
3.2.1 remote.NewRemoteRuntimeService
remote.NewRemoteRuntimeService函数作用:实例化容器相关操作的CRI shim客户端-容器运行时客户端runtimeClient
,实现了上述CRI相关interface/struct分析
中的RuntimeService接口(CRI shim客户端接口)。
主要逻辑:根据kubelet启动参数--container-runtime-endpoint
或使用默认值unix:///var/run/dockershim.sock
,尝试连接该socket,建立client。
// pkg/kubelet/remote/remote_runtime.go
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
klog.V(3).Infof("Connecting to runtime service %s", endpoint)
addr, dailer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
klog.Errorf("Connect remote runtime %s failed: %v", addr, err)
return nil, err
}
return &RemoteRuntimeService{
timeout: connectionTimeout,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
}, nil
}
3.2.2 remote.NewRemoteImageService
remote.NewRemoteImageService函数作用:实例化镜像相关操作的CRI shim客户端-容器镜像客户端imageClient
,实现了上述CRI相关interface/struct分析
中的ImageManagerService接口(CRI shim客户端接口)。
主要逻辑:根据kubelet启动参数--image-service-endpoint
或使用默认值unix:///var/run/dockershim.sock
,尝试连接该socket,建立client。
// pkg/kubelet/remote/remote_runtime.go
// NewRemoteImageService creates a new internalapi.ImageManagerService.
func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) {
klog.V(3).Infof("Connecting to image service %s", endpoint)
addr, dailer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
klog.Errorf("Connect remote image service %s failed: %v", addr, err)
return nil, err
}
return &RemoteImageService{
timeout: connectionTimeout,
imageClient: runtimeapi.NewImageServiceClient(conn),
}, nil
}
3.3 初始化kubeGenericRuntimeManager,用于容器运行时的管理
kuberuntime.NewKubeGenericRuntimeManager函数主要是初始化kubeGenericRuntimeManager struct
,而kubeGenericRuntimeManager struct
是对KubeGenericRuntime interface
的实现。kubeGenericRuntimeManager
是kubelet中容器运行时的管理者,管理着CRI shim
客户端,负责与CRI shim服务端
交互,完成容器和镜像的管理。
初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成。
// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
func NewKubeGenericRuntimeManager(
recorder record.EventRecorder,
livenessManager proberesults.Manager,
startupManager proberesults.Manager,
seccompProfileRoot string,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorapi.MachineInfo,
podStateProvider podStateProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
imagePullQPS float32,
imagePullBurst int,
cpuCFSQuota bool,
cpuCFSQuotaPeriod metav1.Duration,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
internalLifecycle cm.InternalContainerLifecycle,
legacyLogProvider LegacyLogProvider,
runtimeClassManager *runtimeclass.Manager,
) (KubeGenericRuntime, error) {
kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder,
cpuCFSQuota: cpuCFSQuota,
cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
seccompProfileRoot: seccompProfileRoot,
livenessManager: livenessManager,
startupManager: startupManager,
containerRefManager: containerRefManager,
machineInfo: machineInfo,
osInterface: osInterface,
runtimeHelper: runtimeHelper,
runtimeService: newInstrumentedRuntimeService(runtimeService),
imageService: newInstrumentedImageManagerService(imageService),
keyring: credentialprovider.NewDockerKeyring(),
internalLifecycle: internalLifecycle,
legacyLogProvider: legacyLogProvider,
runtimeClassManager: runtimeClassManager,
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
}
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)
if err != nil {
klog.Errorf("Get runtime version failed: %v", err)
return nil, err
}
// Only matching kubeRuntimeAPIVersion is supported now
// TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642
if typedVersion.Version != kubeRuntimeAPIVersion {
klog.Errorf("Runtime api version %s is not supported, only %s is supported now",
typedVersion.Version,
kubeRuntimeAPIVersion)
return nil, ErrVersionNotSupported
}
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
klog.Infof("Container runtime %s initialized, version: %s, apiVersion: %s",
typedVersion.RuntimeName,
typedVersion.RuntimeVersion,
typedVersion.RuntimeApiVersion)
// If the container logs directory does not exist, create it.
// TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to
// new runtime interface
if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {
if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {
klog.Errorf("Failed to create directory %q: %v", podLogsRootDirectory, err)
}
}
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
kubeRuntimeManager,
imageBackOff,
serializeImagePulls,
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.versionCache = cache.NewObjectCache(
func() (interface{}, error) {
return kubeRuntimeManager.getTypedVersion()
},
versionCacheTTL,
)
return kubeRuntimeManager, nil
}
总结
该博文先对CRI做了介绍,然后对kubelet CRI相关源码进行分析,包括kubelet组件CRI相关启动参数分析、CRI相关interface/struct分析、CRI相关初始化分析3个部分,剩下的其他部分分析,将在下一篇CRI博文里做分析。
CRI介绍
CRI,全称Container Runtime Interface
,容器运行时接口。
在1.5以前的版本中,k8s依赖于docker,为了支持不同的容器运行时,如rkt
、containerd
等,kubelet从1.5开始加入了CRI标准,它将 Kubelet 与容器运行时解耦,将原来完全面向 Pod 级别的内部接口拆分成面向 Sandbox
和 Container
的 gRPC 接口,并将镜像管理和容器管理分离到不同的服务。
实现了 CRI 接口的容器运行时通常称为 CRI shim, 这是一个 gRPC Server,监听在本地的 unix socket 上;而 kubelet 作为 gRPC 的客户端来调用 CRI 接口,来进行Pod 和容器、镜像的生命周期管理。另外,容器运行时需要自己负责管理容器的网络,推荐使用 CNI。
提出了CRI标准以后,意味着在新的版本里需要使用新的连接方式与docker通信,为了兼容以前的版本,k8s提供了针对docker的CRI实现,也就是kubelet包下的dockershim
包,dockershim
是一个grpc服务,监听一个端口供kubelet连接,dockershim
收到kubelet的请求后,将其转化为REST API请求,再发送给docker daemon
。
Kubernetes中的容器运行时组成
按照不同的功能可以分为四个部分:
(1)kubelet 中容器运行时的管理,kubeGenericRuntimeManager
,它管理与CRI shim通信的客户端,完成容器和镜像的管理(代码位置:pkg/kubelet/kuberuntime/kuberuntime_manager.go
);
(2)容器运行时接口CRI,包括了容器运行时客户端接口与容器运行时服务端接口;
(3)CRI shim客户端,kubelet持有,用于与CRI shim服务端进行通信;
(4)CRI shim服务端,即具体的容器运行时实现,包括 kubelet 内置的 dockershim
(代码位置:pkg/kubelet/dockershim
)以及外部的容器运行时如 cri-containerd
(用于支持容器引擎containerd
)、rktlet
(用于支持容器引擎rkt
)等。
CRI架构图
在 CRI 之下,包括两种类型的容器运行时的实现:
(1)kubelet内置的 dockershim
,实现了 Docker 容器引擎的支持以及 CNI 网络插件(包括 kubenet)的支持。dockershim
代码内置于kubelet,被kubelet调用,让dockershim
起独立的server来建立CRI shim,向kubelet暴露grpc server;
(2)外部的容器运行时,用来支持 rkt
、containerd
等容器引擎的外部容器运行时。
CRI shim server接口图示
CRI相关初始化
kubelet中CRI相关初始化逻辑如下:
(1)当kubelet选用dockershim作为容器运行时,则初始化并启动容器运行时服务端dockershim(初始化dockershim过程中也会初始化网络插件CNI);
(2)初始化容器运行时CRI shim客户端(用于调用CRI shim服务端:内置的容器运行时dockershim或remote容器运行时);
(3)初始化kubeGenericRuntimeManager
,用于容器运行时的管理。初始化完成后,后续kubelet
对容器以及镜像的相关操作都会通过该结构体持有的CRI shim
客户端,与CRI shim
服务端进行通信来完成。