一、概述
kubelet源码入口:cmd/kubelet/kubelet.go main()
cmd/kubelet/app 包中的Run函数:
查看先参数,kubelet.KubeletDeps
type KubeletDeps struct {
Builder KubeletBuilder
ContainerRuntimeOptions []kubecontainer.Option
Options []Option
Auth server.AuthInterface -------- interface, 重点关注【下面还有,以这个为例】
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
DockerClient libdocker.Interface
EventClient v1core.EventsGetter
KubeClient clientset.Interface
ExternalKubeClient clientgoclientset.Interface
Mounter mount.Interface
NetworkPlugins []network.NetworkPlugin
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
Recorder record.EventRecorder
Writer kubeio.Writer
VolumePlugins []volume.VolumePlugin
TLSOptions *server.TLSOptions
}
以下为server.AuthInterface 接口定义:
// AuthInterface contains all methods required by the auth filters
type AuthInterface interface {
authenticator.Request
authorizer.RequestAttributesGetter
authorizer.Authorizer
}
继续查看:
type Request interface {
AuthenticateRequest(req *http.Request) (user.Info, bool, error)
} type RequestAttributesGetter interface {
GetRequestAttributes(user.Info, *http.Request) Attributes
} type Authorizer interface {
Authorize(a Attributes) (authorized bool, reason string, err error)
}
KubeletDeps 结构体中其他的接口也类似;
二、流程分析
主要是参数的初始化判断,然后通过kubeDeps.ContainerManager进行管理;
该函数返回的是一个ContainerManager接口,如下:
// Manages the containers running on a machine.
type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc) error // Returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
SystemCgroupsLimit() v1.ResourceList // Returns a NodeConfig that is being used by the container manager.
GetNodeConfig() NodeConfig // Returns internal Status.
Status() Status // NewPodContainerManager is a factory method which returns a podContainerManager object
// Returns a noop implementation if qos cgroup hierarchy is not enabled
NewPodContainerManager() PodContainerManager // GetMountedSubsystems returns the mounted cgroup subsystems on the node
GetMountedSubsystems() *CgroupSubsystems // GetQOSContainersInfo returns the names of top level QoS containers
GetQOSContainersInfo() QOSContainersInfo // GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling.
GetNodeAllocatableReservation() v1.ResourceList // GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList // UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error
}
而结构体containerManagerImpl 实现了ContainerManager接口:
继续往下:
进入函数,查看注释:
主要就是做一些基本验证:
启动kublet服务:
func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {
// start the kubelet
go wait.Until(func() { k.Run(podCfg.Updates()) }, , wait.NeverStop) # go routine, 这里主要是对接kube-api
// start the kubelet server
if kubeCfg.EnableServer {
go wait.Until(func() {
k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}, , wait.NeverStop) // *****
} if kubeCfg.ReadOnlyPort > {
go wait.Until(func() {
k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}, , wait.NeverStop)
}
}
启动kubelet HTTP server:
// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.containerRuntime, kl.criHandler)
}
continue:
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletServer(
host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer,
address net.IP,
port uint,
tlsOptions *TLSOptions,
auth AuthInterface,
enableDebuggingHandlers,
enableContentionProfiling bool,
runtime kubecontainer.Runtime,
criHandler http.Handler) { glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, runtime, criHandler) s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), )),
Handler: &handler,
MaxHeaderBytes: << ,
}
if tlsOptions != nil {
s.TLSConfig = tlsOptions.Config
// Passing empty strings as the cert and key files means no
// cert/keys are specified and GetCertificate in the TLSConfig
// should be called instead.
glog.Fatal(s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile)) } else {
glog.Fatal(s.ListenAndServe())
}
}