Ingress nginx Controller源码分析

主要结构图

Ingress nginx Controller源码分析

入口函数

cmd/nginx/main.go

func main() {
   klog.InitFlags(nil)

   rand.Seed(time.Now().UnixNano())

   fmt.Println(version.String())

   showVersion, conf, err := parseFlags()
   if showVersion {
      os.Exit(0)
   }

   if err != nil {
      klog.Fatal(err)
   }

   err = file.CreateRequiredDirectories()
   if err != nil {
      klog.Fatal(err)
   }

   kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
   if err != nil {
      handleFatalInitError(err)
   }

   if len(conf.DefaultService) > 0 {
      err := checkService(conf.DefaultService, kubeClient)
      if err != nil {
         klog.Fatal(err)
      }

      klog.InfoS("Valid default backend", "service", conf.DefaultService)
   }

   if len(conf.PublishService) > 0 {
      err := checkService(conf.PublishService, kubeClient)
      if err != nil {
         klog.Fatal(err)
      }
   }

   if conf.Namespace != "" {
      _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
      if err != nil {
         klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
      }
   }

   conf.FakeCertificate = ssl.GetFakeSSLCert()
   klog.InfoS("SSL fake certificate created", "file", conf.FakeCertificate.PemFileName)

   if !k8s.NetworkingIngressAvailable(kubeClient) {
      klog.Fatalf("ingress-nginx requires Kubernetes v1.19.0 or higher")
   }

   _, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{})
   if err != nil {
      if !errors.IsNotFound(err) {
         if errors.IsForbidden(err) {
            klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err)
            conf.IngressClassConfiguration.IgnoreIngressClass = true
         }
      }
   }
   conf.Client = kubeClient

   err = k8s.GetIngressPod(kubeClient)
   if err != nil {
      klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err)
   }

   reg := prometheus.NewRegistry()

   reg.MustRegister(prometheus.NewGoCollector())
   reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
      PidFn:        func() (int, error) { return os.Getpid(), nil },
      ReportErrors: true,
   }))

   mc := metric.NewDummyCollector()
   if conf.EnableMetrics {
      mc, err = metric.NewCollector(conf.MetricsPerHost, reg, conf.IngressClassConfiguration.Controller)
      if err != nil {
         klog.Fatalf("Error creating prometheus collector:  %v", err)
      }
   }
   // Pass the ValidationWebhook status to determine if we need to start the collector
   // for the admissionWebhook
   mc.Start(conf.ValidationWebhook)

   if conf.EnableProfiling {
      go registerProfiler()
   }

   ngx := controller.NewNGINXController(conf, mc)

   mux := http.NewServeMux()
   registerHealthz(nginx.HealthPath, ngx, mux)
   registerMetrics(reg, mux)

   go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
   go ngx.Start()

   handleSigterm(ngx, func(code int) {
      os.Exit(code)
   })
}

主要逻辑

  1. Step1 初始化配置,获取kubeClient

    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)

  2. Step2 检查DefaultService、PublicService、Namespace等存在

  3. step3 检查IngressClasses权限

  4. step4 检查ingress controller pod存在

  5. Step5 创建NGINXController, 开启健康检查,metrics

    ngx := controller.NewNGINXController(conf, mc)

方法NewNGINXController

//internal/ingress/controller/nginx.go
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartLogging(klog.Infof)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
      Interface: config.Client.CoreV1().Events(config.Namespace),
   })

   h, err := dns.GetSystemNameServers()
   if err != nil {
      klog.Warningf("Error reading system nameservers: %v", err)
   }

   n := &NGINXController{
      isIPV6Enabled: ing_net.IsIPv6Enabled(),

      resolver:        h,
      cfg:             config,
      syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

      recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
         Component: "nginx-ingress-controller",
      }),

      stopCh:   make(chan struct{}),
      updateCh: channels.NewRingChannel(1024),

      ngxErrCh: make(chan error),

      stopLock: &sync.Mutex{},

      runningConfig: new(ingress.Configuration),

      Proxy: &TCPProxy{},

      metricCollector: mc,

      command: NewNginxCommand(),
   }

   if n.cfg.ValidationWebhook != "" {
      n.validationWebhookServer = &http.Server{
         Addr:      config.ValidationWebhook,
         Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
         TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
         // disable http/2
         // https://github.com/kubernetes/kubernetes/issues/80313
         // https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
         TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
      }
   }

   n.store = store.New(
      config.Namespace,
      config.WatchNamespaceSelector,
      config.ConfigMapName,
      config.TCPConfigMapName,
      config.UDPConfigMapName,
      config.DefaultSSLCertificate,
      config.ResyncPeriod,
      config.Client,
      n.updateCh,
      config.DisableCatchAll,
      config.IngressClassConfiguration)

   n.syncQueue = task.NewTaskQueue(n.syncIngress)

   if config.UpdateStatus {
      n.syncStatus = status.NewStatusSyncer(status.Config{
         Client:                 config.Client,
         PublishService:         config.PublishService,
         PublishStatusAddress:   config.PublishStatusAddress,
         IngressLister:          n.store,
         UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
         UseNodeInternalIP:      config.UseNodeInternalIP,
      })
   } else {
      klog.Warning("Update of Ingress status is disabled (flag --update-status)")
   }

   onTemplateChange := func() {
      template, err := ngx_template.NewTemplate(nginx.TemplatePath)
      if err != nil {
         // this error is different from the rest because it must be clear why nginx is not working
         klog.ErrorS(err, "Error loading new template")
         return
      }

      n.t = template
      klog.InfoS("New NGINX configuration template loaded")
      n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
   }

   ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
   if err != nil {
      klog.Fatalf("Invalid NGINX configuration template: %v", err)
   }

   n.t = ngxTpl

   _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
   if err != nil {
      klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
   }

   filesToWatch := []string{}
   err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
      if err != nil {
         return err
      }

      if info.IsDir() {
         return nil
      }

      filesToWatch = append(filesToWatch, path)
      return nil
   })

   if err != nil {
      klog.Fatalf("Error creating file watchers: %v", err)
   }

   for _, f := range filesToWatch {
      _, err = watch.NewFileWatcher(f, func() {
         klog.InfoS("File changed detected. Reloading NGINX", "path", f)
         n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
      })
      if err != nil {
         klog.Fatalf("Error creating file watcher for %v: %v", f, err)
      }
   }

   return n
}

主要逻辑:

  1. 实例化NGINXController

  2. 创建validationWebhookServer

  3. 创建store缓存

    n.store = store.New
    
  4. 监听模板文件以及/etc/nginx/geoip下的文件变化,如有变化,发送对应的文件变化事件

  5. 当ingress有变化时,执行syncIngress方法回调

    n.syncQueue = task.NewTaskQueue(n.syncIngress)

万能的死循环

Start方法,开启万能的死循环,处理各种Event

//internal/ingress/controller/nginx.go
for {
   select {
   case err := <-n.ngxErrCh:
      if n.isShuttingDown {
         return
      }

      // if the nginx master process dies, the workers continue to process requests
      // until the failure of the configured livenessProbe and restart of the pod.
      if process.IsRespawnIfRequired(err) {
         return
      }

   case event := <-n.updateCh.Out():
      if n.isShuttingDown {
         break
      }

      if evt, ok := event.(store.Event); ok {
         klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
         if evt.Type == store.ConfigurationEvent {
            // TODO: is this necessary? Consider removing this special case
            n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
            continue
         }

         n.syncQueue.EnqueueSkippableTask(evt.Obj)
      } else {
         klog.Warningf("Unexpected event type received %T", event)
      }
   case <-n.stopCh:
      return
   }
}

当updateCh中有数据时,放到syncQueue中

//internal/task/queue.go
// worker processes work in the queue through sync.
func (t *Queue) worker() {
   for {
      key, quit := t.queue.Get()
      if quit {
         if !isClosed(t.workerDone) {
            close(t.workerDone)
         }
         return
      }
      ts := time.Now().UnixNano()

      item := key.(Element)
      if item.Timestamp != 0 && t.lastSync > item.Timestamp {
         klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
         t.queue.Forget(key)
         t.queue.Done(key)
         continue
      }

      klog.V(3).InfoS("syncing", "key", item.Key)
      if err := t.sync(key); err != nil {
         klog.ErrorS(err, "requeuing", "key", item.Key)
         t.queue.AddRateLimited(Element{
            Key:       item.Key,
            Timestamp: 0,
         })
      } else {
         t.queue.Forget(key)
         t.lastSync = ts
      }

      t.queue.Done(key)
   }
}

当syncQueue中有数据,回调syncIngress方法 t.sync(key)

//internal/ingress/controller/store/store.go
ingEventHandler := cache.ResourceEventHandlerFuncs{
   AddFunc: func(obj interface{}) {
      ing, _ := toIngress(obj)

      if !watchedNamespace(ing.Namespace) {
         return
      }

      ic, err := store.GetIngressClass(ing, icConfig)
      if err != nil {
         klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
         return
      }

      klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic)

      if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
         klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
         return
      }

      recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")

      store.syncIngress(ing)
      store.updateSecretIngressMap(ing)
      store.syncSecrets(ing)

      updateCh.In() <- Event{
         Type: CreateEvent,
         Obj:  obj,
      }
   },
   DeleteFunc: ingDeleteHandler,
   UpdateFunc: func(old, cur interface{}) {
      oldIng, _ := toIngress(old)
      curIng, _ := toIngress(cur)

      if !watchedNamespace(oldIng.Namespace) {
         return
      }

      var errOld, errCur error
      var classCur string
      if !icConfig.IgnoreIngressClass {
         _, errOld = store.GetIngressClass(oldIng, icConfig)
         classCur, errCur = store.GetIngressClass(curIng, icConfig)
      }
      if errOld != nil && errCur == nil {
         if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
            klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
            return
         }

         klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur)
         recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
      } else if errOld == nil && errCur != nil {
         klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng))
         ingDeleteHandler(old)
         return
      } else if errCur == nil && !reflect.DeepEqual(old, cur) {
         if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
            klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
            ingDeleteHandler(old)
            return
         }

         recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
      } else {
         klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
         return
      }

      store.syncIngress(curIng)
      store.updateSecretIngressMap(curIng)
      store.syncSecrets(curIng)

      updateCh.In() <- Event{
         Type: UpdateEvent,
         Obj:  cur,
      }
   },
}

internal/ingress/controller/store/store.go

New方法构造各种资源的处理EventHandler

Ingress举例:ResourceEventHandlerFuncs

处理Ingress的Add、Delete、Update

store.informers.Ingress.AddEventHandler(ingEventHandler)
if !icConfig.IgnoreIngressClass {
   store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
}
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(serviceHandler)

Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service注册到k8s的sdk Informers中

上一篇:6.824 lab1 MapReduce


下一篇:Oracle 备份与恢复学习笔记(10)