kubelet源码分析——关闭Pod

上一篇说到kublet如何启动一个pod,本篇讲述如何关闭一个Pod,引用一段来自官方文档介绍pod的生命周期的话

  1. 你使用 kubectl 工具手动删除某个特定的 Pod,而该 Pod 的体面终止限期是默认值(30 秒)。
  2. API 服务器中的 Pod 对象被更新,记录涵盖体面终止限期在内 Pod 的最终死期,超出所计算时间点则认为 Pod 已死(dead)。 如果你使用 kubectl describe 来查验你正在删除的 Pod,该 Pod 会显示为 "Terminating" (正在终止)。 在 Pod 运行所在的节点上:kubelet 一旦看到 Pod 被标记为正在终止(已经设置了体面终止限期),kubelet 即开始本地的 Pod 关闭过程。
    1. 如果 Pod 中的容器之一定义了 preStop 回调, kubelet 开始在容器内运行该回调逻辑。如果超出体面终止限期时,preStop 回调逻辑 仍在运行,kubelet 会请求给予该 Pod 的宽限期一次性增加 2 秒钟。

    说明: 如果 preStop 回调所需要的时间长于默认的体面终止限期,你必须修改 terminationGracePeriodSeconds 属性值来使其正常工作。

    1. kubelet 接下来触发容器运行时发送 TERM 信号给每个容器中的进程 1。

    说明: Pod 中的容器会在不同时刻收到 TERM 信号,接收顺序也是不确定的。 如果关闭的顺序很重要,可以考虑使用 preStop 回调逻辑来协调。

  3. 与此同时,kubelet 启动体面关闭逻辑,控制面会将 Pod 从对应的端点列表(以及端点切片列表, 如果启用了的话)中移除,过滤条件是 Pod 被对应的 服务以某 选择算符选定。 ReplicaSets和其他工作负载资源 不再将关闭进程中的 Pod 视为合法的、能够提供服务的副本。关闭动作很慢的 Pod 也无法继续处理请求数据,因为负载均衡器(例如服务代理)已经在终止宽限期开始的时候 将其从端点列表中移除。
  4. 超出终止宽限期限时,kubelet 会触发强制关闭过程。容器运行时会向 Pod 中所有容器内 仍在运行的进程发送 SIGKILL 信号。 kubelet 也会清理隐藏的 pause 容器,如果容器运行时使用了这种容器的话。
  5. kubelet 触发强制从 API 服务器上删除 Pod 对象的逻辑,并将体面终止限期设置为 0 (这意味着马上删除)。
  6. API 服务器删除 Pod 的 API 对象,从任何客户端都无法再看到该对象。

简单概括为

  1. 删除一个Pod后系统默认给30s的宽限期,并将它的状态设置成Terminating
  2. kublectl发现Pod状态为Terminating则尝试执行preStop生命周期勾子,并可多给2s的宽限期
  3. 同时控制面将Pod中svc的endpoint中去除
  4. 宽限期到则发送TERM信号
  5. Pod还不关闭再发送SIGKILL强制关闭,并清理sandbox
  6. kubelet删除Pod资源对象。

下面则从kublet源码中查看这个过程

kubelet.syncLoop

前文讲到kubelet.syncLoop这个循环包含了kublet主要的核心的操作,Pod的启动从这里开始,Pod的关闭也从这里开始,与之前Pod启动的极为相似,最终还是到达了kublet的sync方法

kubelet.syncLoop	/pkg/kubelet/kubelet.go
|--kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
	|--u, open := <-configCh
	|--handler.HandlePodUpdates(u.Pods)即Kubelet.HandlePodUpdates
		|--kl.handleMirrorPod(pod, start)
			|--kl.dispatchWork
		|--kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
			|--kl.podWorkers.UpdatePod即podWorkers.UpdatePod	/pkg/kubelet/pod_worker.go
				|--p.managePodLoop
					|--p.syncPodFn

Kubelet.dispatchWork

但是需要穿插提前说一下这个方法,当pod的container是Termial(status.State.Terminated不为空)且DeletionTimestamp不为空(资源被调用删除后这个字段会填值),就会调用statusManager.TerminatePod,这个方法的作用后续会说,按着顺序走调用podWorkers.UpdatePod方法,传入的UpdateType是SyncPodUpdate。

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
	if pod.DeletionTimestamp != nil && containersTerminal {
		kl.statusManager.TerminatePod(pod)
		return
	}

	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(&UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		OnCompleteFunc: func(err error) {
			if err != nil {
				metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
			}
		},
	})
}

Pod sync(Kubelet.syncPod)

还是走到kubelet.syncPod方法,在这个方法里面一开始也有一个killPod方法的的调用,但是本次进入传参updateType是SyncPodUpdate,因此会往下走,走到runnable.Admit的判断才是进入调用killPod方法

func (kl *Kubelet) syncPod(o syncPodOptions) error {

	// if we want to kill a pod, do it now!
	if updateType == kubetypes.SyncPodKill {
		kl.statusManager.SetPodStatus(pod, apiPodStatus)
		// we kill the pod with the specified grace period since this is a termination
		if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
	
		}
		return nil
	}

	// Kill pod if it should not be running
	if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
		var syncErr error
		if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
			syncErr = fmt.Errorf("error killing pod: %v", err)
			utilruntime.HandleError(syncErr)
		} else {
			if !runnable.Admit {
				// There was no error killing the pod, but the pod cannot be run.
				// Return an error to signal that the sync loop should back off.
				syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
			}
		}
		return syncErr
	}
}

killPod

Kubelet.syncPod		/pkg/kubelet/kubelet.go
|--Kubelet.killPod	/pkg/kubelet/kubelet_pods.go
	|--kl.containerRuntime.KillPod	
	|==kubeGenericRuntimeManager.KillPod	/pkg/kubelet/kuberuntime/kuberuntime_manager.go
	|	|- m.killPodWithSyncResult
	|--kl.containerManager.UpdateQOSCgroups()

经过多层的调用,来到kubeGenericRuntimeManager.killPodWithSyncResult方法,代码中关键操作有两个
1 先停止属于该pod的所有containers
2 然后再停止pod sandbox容器

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
	killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
	
	// Stop all sandboxes belongs to same pod
	for _, podSandbox := range runningPod.Sandboxes {
		if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
		}
	}

	return
}

killContainer

kubeGenericRuntimeManager.killPodWithSyncResult		/pkg/kubelet/kuberuntime/kuberuntime_manager.go
|--m.killContainersWithSyncResult			/pkg/kubelet/kuberuntime/kuberuntime_container.go
	|--m.killContainer

killContainersWithSyncResult经过两层调用来到kubeGenericRuntimeManager.killContainer,从代码看到
1 关闭pod的宽限时间设置
2 执行pod的preStop生命周期钩子
3 宽限时间不够可以再多给2s
4 停止容器

func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodOverride *int64) error {

	//1 关闭pod的宽限时间设置
	gracePeriod := int64(minimumGracePeriodInSeconds)
	switch {
	case pod.DeletionGracePeriodSeconds != nil:
		gracePeriod = *pod.DeletionGracePeriodSeconds
	case pod.Spec.TerminationGracePeriodSeconds != nil:
		gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
	}

	//2 执行pod的preStop生命周期钩子
	if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
		//这里执行完会返回剩余的宽限时间
		gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
	}

	//3 宽限时间不够可以再多给2s
	// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
	if gracePeriod < minimumGracePeriodInSeconds {
		gracePeriod = minimumGracePeriodInSeconds
	}

	//4 停止容器
	err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
}

若要往下追源码,可在下面这方法看到往dockerDeamon发送stop容器的请求

func (cli *Client) ContainerStop(ctx context.Context, containerID string, timeout *time.Duration) error {
	query := url.Values{}
	if timeout != nil {
		query.Set("t", timetypes.DurationToSecondsString(*timeout))
	}
	resp, err := cli.post(ctx, "/containers/"+containerID+"/stop", query, nil, nil)
	ensureReaderClosed(resp)
	return err
}

调用链如下

m.runtimeService.StopContainer		/pkg/kubelet/kuberuntime/kuberuntime_container.go
|==remoteRuntimeService.StopContainer	/pkg/kubelet/cri/remote/remote_runtime.go
	|--r.runtimeClient.StopContainer	
	|==dockerService.StopContainer		/pkg/kubelet/dockershim/docker_container.go
		|--ds.client.StopContainer	
		|==kubeDockerClient.StopContainer	/pkg/kubelet/dockershim/libdocker/kube_docker_client.go
			|--d.client.ContainerStop	//就是上面的Client.ContainerStop

注:当使用GOALND看代码时追到r.runtimeClient.StopContainer时会发现调到cri-api包里面的RuntimeServiceClient,这个包处于vendor中,又找不到实现,实际上这里已经是kubelet开始调CRI了,目前的例子是使用docker作为CRI,那相关代码在/pkg/kubelet/dockershim里面找,这里是涉及到container的则看docker_container.go,像上一篇跟sandbox相关的在docker_sandbox.go里面找

StopPodSandbox

killPodWithSyncResult的另外一个关键调用就是调用StopPodSandbox方法,为了停止SandBox,主要步骤有
1 调用ds.network.TearDownPod:删除pod网络;
2 调用ds.client.StopContainer:停止pod sandbox容器。
代码位于/pkg/kubelet/dockershim/docker_sandbox.go

func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
	ready, ok := ds.getNetworkReady(podSandboxID)
	if !hostNetwork && (ready || !ok) {
		err := ds.network.TearDownPod(namespace, name, cID)
	}
	if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {
	}
}

TearDownPod是CRI的方法,用于清除容器网络,StopContainer则与上面停止业务容器时调用ds.client.StopContainer一样,实际上调用kubeDockerClient.StopContainer最终往dockerDaemon发stop容器的post请求。

后续清理Pod资源

至此Pod就停下来了,从状态Terminating转成Terminated,Pod这个资源将要etcd中删除,通过api-server查也查不到,这个调用api-server删pod资源的动作由kublet的statusManager执行

在执行kubelet的Run方法跑起kubelet的核心循环syncLoop之前,启动了各种manager,其中有一个便是statusManager,statusManager的Run方法是开了一个协程不断去循环同步Pod状态,触发方式有两种,其一是从通道里传入,单个执行同步;另一是通过定时器触发批量执行同步

代码位于/pkg/kubelet/status/status_manager.go

func (m *manager) Start() {
	go wait.Forever(func() {
		for {
			select {
			case syncRequest := <-m.podStatusChannel:
				m.syncPod(syncRequest.podUID, syncRequest.status)
			case <-syncTicker:
				for i := len(m.podStatusChannel); i > 0; i-- {
					<-m.podStatusChannel
				}
				m.syncBatch()
			}
		}
	}, 0)
}

syncPod的简略如下

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
	if m.canBeDeleted(pod, status.status) {
		deleteOptions := metav1.DeleteOptions{
			GracePeriodSeconds: new(int64),
			// Use the pod UID as the precondition for deletion to prevent deleting a
			// newly created pod with the same name and namespace.
			Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
		}
		err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
	}
}

执行canBeDeleted方法作用如函数名一致用于判定当前pod的状况能否去执行pod资源的删除,最终会调用到Kubelet.PodResourcesAreReclaimed方法,大致是判断pod的业务容器和sandbox是否有清理干净,volume有否卸载完毕,cgroup是否有清理完毕,代码位于/pkg/kubelet/kubelet_pods.go

func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
	if !notRunning(status.ContainerStatuses) {
		// We shouldn't delete pods that still have running containers
		klog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
		return false
	}
	// pod's containers should be deleted
	runtimeStatus, err := kl.podCache.Get(pod.UID)
	if err != nil {
		klog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
		return false
	}
	if len(runtimeStatus.ContainerStatuses) > 0 {
		var statusStr string
		for _, status := range runtimeStatus.ContainerStatuses {
			statusStr += fmt.Sprintf("%+v ", *status)
		}
		klog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr)
		return false
	}
	// pod's sandboxes should be deleted
	if len(runtimeStatus.SandboxStatuses) > 0 {
		var sandboxStr string
		for _, sandbox := range runtimeStatus.SandboxStatuses {
			sandboxStr += fmt.Sprintf("%+v ", *sandbox)
		}
		klog.V(3).Infof("Pod %q is terminated, but some pod sandboxes have not been cleaned up: %s", format.Pod(pod), sandboxStr)
		return false
	}

	if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
		// We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
		klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
		return false
	}
	if kl.kubeletConfiguration.CgroupsPerQOS {
		pcm := kl.containerManager.NewPodContainerManager()
		if pcm.Exists(pod) {
			klog.V(3).Infof("Pod %q is terminated, but pod cgroup sandbox has not been cleaned up", format.Pod(pod))
			return false
		}
	}
	return true
}

同步Pod的触发源头

然后要找到这串逻辑触发的源头,就在先前kubelet.dispatchWork方法开头的那个判断,经过之前清理了各种容器后Pod的状态已转换成Terminated,再次走到dispatchWork方法时就会进入statusManager.TerminatePod

Kubelet.dispatchWork			/pkg/kubelet/kubelet.go	
|--kl.statusManager.TerminatePod(pod)
|==manager.TerminatePod			/pkg/kubelet/status/status_manager.go
	|--m.updateStatusInternal
		|--m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}

寻找statusManager,调api删除pod资源,前者在startKubelet时开了个协程去同步,在kubelet.dispatchWork处调kl.statusManager.TerminatePod往通道里塞pod触发逻辑

小结

本篇从kubelet的主循环开始,讲述了kubelet启动pod的过程,包括状态更新,分配cgroup,创建容器目录,等待volume挂载,注入imagepull secret,创建sandbox,调用cni编织网络,启动临时容器,init容器,业务容器,执行postStart生命周期钩子。

如要回顾本系列的文章可点击
kubelet源码分析——kubelet简介与启动
kubelet源码分析——启动Pod
kubelet源码分析——关闭Pod

参考文章

kubernetes/k8s CRI 分析-kubelet删除pod分析
pod删除主要流程源码解析
Pod 的终止

上一篇:深度强化学习——ppo(待重写)


下一篇:KL散度(Divergence)