cmd\kube-controller-manager\app\core.go
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go replicaset.NewReplicaSetController(
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs)) --->pkg\controller\replicaset\replica_set.go
return nil, true, nil
}
func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go replicationcontroller.NewReplicationManager(
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas,
).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs)) --->pkg\controller\replication\replication_controller.go
return nil, true, nil
}
pkg\controller\replication\replication_controller.go
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
return &ReplicationManager{
*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
v1.SchemeGroupVersion.WithKind("ReplicationController"),
"replication_controller",
"replicationmanager",
podControlAdapter{controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
}},
),
}
} --->pkg\controller\replication\replication_controller.go
pkg\controller\replicaset\replica_set.go
Run
// 开协程池进行任务监听
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, rsc.worker, time.Second)
}
func (rsc *ReplicaSetController) worker(ctx context.Context) {
for rsc.processNextWorkItem(ctx) {
}
}
func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
// 通过队列get保证无锁模式不重复处理
key, quit := rsc.queue.Get()
err := rsc.syncHandler(ctx, key.(string)) --->syncReplicaSet
// 处理失败会重新入队,只是会有频率限制
if err == nil {
rsc.queue.Forget(key)
return true
}
rsc.queue.AddRateLimited(key)
return true
}
**syncReplicaSet**
// 获取rs配置及selector标签
namespace, name, err := cache.SplitMetaNamespaceKey(key)
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
// 获取所有处于活跃状态pod
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
filteredPods := controller.FilterActivePods(allPods)
// 返回符合label的pod列表
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
// 开始运算调整
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
}
// 因为缓存的缘故必须DeepCopy后才能修改信息
rs = rs.DeepCopy()
// 更新rs新的状态
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
// 如果仍有不符合rs定义的副本数的情况,则会再次入队列延迟后处理
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
...
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
return cm.ClaimPods(ctx, filteredPods) --->pkg\controller\controller_ref_manager.go
}
**manageReplicas**
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
if diff < 0 {
// 小于0说明需要增加pod数量
diff *= -1
if diff > rsc.burstReplicas {
// 如果差异数量太多,则会按burstReplicas(默认500)创建
diff = rsc.burstReplicas
}
// 分小批单开go创建
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
})
} else if diff > 0 {
// d大于0说明需要减少pod数量
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// 获取要被清理的pod列表
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
errCh <- err
}
}
}(pod)
}
wg.Wait()
select {
case err := <-errCh:
// 如果任何一个删除时出现报错则直接返回
if err != nil {
return err
}
default:
}
}
func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
if diff < len(filteredPods) {
podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
sort.Sort(podsWithRanks)
reportSortingDeletionAgeRatioMetric(filteredPods, diff)
}
return filteredPods[:diff]
}
// node节点上每有一个Active的pod则加一分
func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
podsOnNode := make(map[string]int)
for _, pod := range relatedPods {
if controller.IsPodActive(pod) {
podsOnNode[pod.Spec.NodeName]++
}
}
ranks := make([]int, len(podsToRank))
for i, pod := range podsToRank {
ranks[i] = podsOnNode[pod.Spec.NodeName]
}
return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks, Now: metav1.Now()}
}
pkg\controller\controller_ref_manager.go
// 匹配label,返回符合的pod列表
ClaimPods
match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// Check selector first so filters only run on potentially matching Pods.
if !m.Selector.Matches(labels.Set(pod.Labels)) {
return false
}
for _, filter := range filters {
if !filter(pod) {
return false
}
}
return true
}