• kube-controller-manager源码分析(一)之 NewControllerManagerCommand
  • 1. Main函数
  • 2. NewControllerManagerCommand
    • 2.1. NewKubeControllerManagerOptions
    • 2.2. AddFlagSet
  • 3. Run
    • 3.1. CreateControllerContext
    • 3.2. NewControllerInitializers
    • 3.3. StartControllers
    • 3.4. InformerFactory.Start
      • 3.4.1. SharedInformerFactory
      • 3.4.2. sharedInformerFactory.Start
      • 3.4.3. sharedIndexInformer.Run
  • 4. initFn(ctx)
    • 4.1. startStatefulSetController
    • 4.2. startDeploymentController
  • 5. 总结

    kube-controller-manager源码分析(一)之 NewControllerManagerCommand

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kube-controller-manager 部分的代码。

    本文主要分析 kubernetes/cmd/kube-controller-manager部分,该部分主要涉及各种类型的controller的参数解析,及初始化,例如 deployment controllerstatefulset controller。并没有具体controller运行的详细逻辑,该部分位于kubernetes/pkg/controller模块,待后续文章分析。

    kube-controller-managercmd部分代码目录结构如下:

    1. kube-controller-manager
    2. ├── app
    3. ├── apps.go # 包含:startDeploymentController、startReplicaSetController、startStatefulSetController、startDaemonSetController
    4. ├── autoscaling.go # startHPAController
    5. ├── batch.go # startJobController、startCronJobController
    6. ├── bootstrap.go
    7. ├── certificates.go
    8. ├── cloudproviders.go
    9. ├── config
    10. └── config.go # config: controller manager执行的上下文
    11. ├── controllermanager.go # 包含:NewControllerManagerCommand、Run、NewControllerInitializers、StartControllers等
    12. ├── core.go # startServiceController、startNodeIpamController、startPersistentVolumeBinderController、startNamespaceController等
    13. ├── options # 包含不同controller的option参数
    14. ├── attachdetachcontroller.go
    15. ├── csrsigningcontroller.go
    16. ├── daemonsetcontroller.go # DaemonSetControllerOptions
    17. ├── deploymentcontroller.go # DeploymentControllerOptions
    18. ├── deprecatedcontroller.go
    19. ├── endpointcontroller.go
    20. ├── garbagecollectorcontroller.go
    21. ├── hpacontroller.go
    22. ├── jobcontroller.go
    23. ├── namespacecontroller.go # NamespaceControllerOptions
    24. ├── nodeipamcontroller.go
    25. ├── nodelifecyclecontroller.go
    26. ├── options.go # KubeControllerManagerOptions、NewKubeControllerManagerOptions
    27. ├── persistentvolumebindercontroller.go
    28. ├── podgccontroller.go
    29. ├── replicasetcontroller.go # ReplicaSetControllerOptions
    30. ├── replicationcontroller.go
    31. ├── resourcequotacontroller.go
    32. ├── serviceaccountcontroller.go
    33. └── ttlafterfinishedcontroller.go
    34. └── controller-manager.go # main入口函数

    1. Main函数

    kube-controller-manager的入口函数Main函数,仍然是采用统一的代码风格,使用Cobra命令行框架。

    1. func main() {
    2. rand.Seed(time.Now().UTC().UnixNano())
    3. command := app.NewControllerManagerCommand()
    4. // TODO: once we switch everything over to Cobra commands, we can go back to calling
    5. // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    6. // normalize func and add the go flag set by hand.
    7. pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    8. pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    9. // utilflag.InitFlags()
    10. logs.InitLogs()
    11. defer logs.FlushLogs()
    12. if err := command.Execute(); err != nil {
    13. fmt.Fprintf(os.Stderr, "%v\n", err)
    14. os.Exit(1)
    15. }
    16. }

    核心代码:

    1. // 初始化命令行结构体
    2. command := app.NewControllerManagerCommand()
    3. // 执行Execute
    4. err := command.Execute()

    2. NewControllerManagerCommand

    该部分代码位于:kubernetes/cmd/kube-controller-manager/app/controllermanager.go

    1. // NewControllerManagerCommand creates a *cobra.Command object with default parameters
    2. func NewControllerManagerCommand() *cobra.Command {
    3. ...
    4. cmd := &cobra.Command{
    5. Use: "kube-controller-manager",
    6. Long: `The Kubernetes controller manager is a daemon that embeds
    7. the core control loops shipped with Kubernetes. In applications of robotics and
    8. automation, a control loop is a non-terminating loop that regulates the state of
    9. the system. In Kubernetes, a controller is a control loop that watches the shared
    10. state of the cluster through the apiserver and makes changes attempting to move the
    11. current state towards the desired state. Examples of controllers that ship with
    12. Kubernetes today are the replication controller, endpoints controller, namespace
    13. controller, and serviceaccounts controller.`,
    14. Run: func(cmd *cobra.Command, args []string) {
    15. verflag.PrintAndExitIfRequested()
    16. utilflag.PrintFlags(cmd.Flags())
    17. c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
    18. if err != nil {
    19. fmt.Fprintf(os.Stderr, "%v\n", err)
    20. os.Exit(1)
    21. }
    22. if err := Run(c.Complete(), wait.NeverStop); err != nil {
    23. fmt.Fprintf(os.Stderr, "%v\n", err)
    24. os.Exit(1)
    25. }
    26. },
    27. }
    28. ...
    29. }

    构建一个*cobra.Command对象,然后执行Run函数。

    2.1. NewKubeControllerManagerOptions

    1. s, err := options.NewKubeControllerManagerOptions()
    2. if err != nil {
    3. glog.Fatalf("unable to initialize command options: %v", err)
    4. }

    初始化controllerManager的参数,其中主要包括了各种controller的option,例如DeploymentControllerOptions:

    1. // DeploymentControllerOptions holds the DeploymentController options.
    2. type DeploymentControllerOptions struct {
    3. ConcurrentDeploymentSyncs int32
    4. DeploymentControllerSyncPeriod metav1.Duration
    5. }

    具体代码如下:

    1. // NewKubeControllerManagerOptions creates a new KubeControllerManagerOptions with a default config.
    2. func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
    3. componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
    4. if err != nil {
    5. return nil, err
    6. }
    7. s := KubeControllerManagerOptions{
    8. Generic: cmoptions.NewGenericControllerManagerConfigurationOptions(componentConfig.Generic),
    9. KubeCloudShared: cmoptions.NewKubeCloudSharedOptions(componentConfig.KubeCloudShared),
    10. AttachDetachController: &AttachDetachControllerOptions{
    11. ReconcilerSyncLoopPeriod: componentConfig.AttachDetachController.ReconcilerSyncLoopPeriod,
    12. },
    13. CSRSigningController: &CSRSigningControllerOptions{
    14. ClusterSigningCertFile: componentConfig.CSRSigningController.ClusterSigningCertFile,
    15. ClusterSigningKeyFile: componentConfig.CSRSigningController.ClusterSigningKeyFile,
    16. ClusterSigningDuration: componentConfig.CSRSigningController.ClusterSigningDuration,
    17. },
    18. DaemonSetController: &DaemonSetControllerOptions{
    19. ConcurrentDaemonSetSyncs: componentConfig.DaemonSetController.ConcurrentDaemonSetSyncs,
    20. },
    21. DeploymentController: &DeploymentControllerOptions{
    22. ConcurrentDeploymentSyncs: componentConfig.DeploymentController.ConcurrentDeploymentSyncs,
    23. DeploymentControllerSyncPeriod: componentConfig.DeploymentController.DeploymentControllerSyncPeriod,
    24. },
    25. DeprecatedFlags: &DeprecatedControllerOptions{
    26. RegisterRetryCount: componentConfig.DeprecatedController.RegisterRetryCount,
    27. },
    28. EndpointController: &EndpointControllerOptions{
    29. ConcurrentEndpointSyncs: componentConfig.EndpointController.ConcurrentEndpointSyncs,
    30. },
    31. GarbageCollectorController: &GarbageCollectorControllerOptions{
    32. ConcurrentGCSyncs: componentConfig.GarbageCollectorController.ConcurrentGCSyncs,
    33. EnableGarbageCollector: componentConfig.GarbageCollectorController.EnableGarbageCollector,
    34. },
    35. HPAController: &HPAControllerOptions{
    36. HorizontalPodAutoscalerSyncPeriod: componentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod,
    37. HorizontalPodAutoscalerUpscaleForbiddenWindow: componentConfig.HPAController.HorizontalPodAutoscalerUpscaleForbiddenWindow,
    38. HorizontalPodAutoscalerDownscaleForbiddenWindow: componentConfig.HPAController.HorizontalPodAutoscalerDownscaleForbiddenWindow,
    39. HorizontalPodAutoscalerDownscaleStabilizationWindow: componentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow,
    40. HorizontalPodAutoscalerCPUInitializationPeriod: componentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod,
    41. HorizontalPodAutoscalerInitialReadinessDelay: componentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay,
    42. HorizontalPodAutoscalerTolerance: componentConfig.HPAController.HorizontalPodAutoscalerTolerance,
    43. HorizontalPodAutoscalerUseRESTClients: componentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients,
    44. },
    45. JobController: &JobControllerOptions{
    46. ConcurrentJobSyncs: componentConfig.JobController.ConcurrentJobSyncs,
    47. },
    48. NamespaceController: &NamespaceControllerOptions{
    49. NamespaceSyncPeriod: componentConfig.NamespaceController.NamespaceSyncPeriod,
    50. ConcurrentNamespaceSyncs: componentConfig.NamespaceController.ConcurrentNamespaceSyncs,
    51. },
    52. NodeIPAMController: &NodeIPAMControllerOptions{
    53. NodeCIDRMaskSize: componentConfig.NodeIPAMController.NodeCIDRMaskSize,
    54. },
    55. NodeLifecycleController: &NodeLifecycleControllerOptions{
    56. EnableTaintManager: componentConfig.NodeLifecycleController.EnableTaintManager,
    57. NodeMonitorGracePeriod: componentConfig.NodeLifecycleController.NodeMonitorGracePeriod,
    58. NodeStartupGracePeriod: componentConfig.NodeLifecycleController.NodeStartupGracePeriod,
    59. PodEvictionTimeout: componentConfig.NodeLifecycleController.PodEvictionTimeout,
    60. },
    61. PersistentVolumeBinderController: &PersistentVolumeBinderControllerOptions{
    62. PVClaimBinderSyncPeriod: componentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod,
    63. VolumeConfiguration: componentConfig.PersistentVolumeBinderController.VolumeConfiguration,
    64. },
    65. PodGCController: &PodGCControllerOptions{
    66. TerminatedPodGCThreshold: componentConfig.PodGCController.TerminatedPodGCThreshold,
    67. },
    68. ReplicaSetController: &ReplicaSetControllerOptions{
    69. ConcurrentRSSyncs: componentConfig.ReplicaSetController.ConcurrentRSSyncs,
    70. },
    71. ReplicationController: &ReplicationControllerOptions{
    72. ConcurrentRCSyncs: componentConfig.ReplicationController.ConcurrentRCSyncs,
    73. },
    74. ResourceQuotaController: &ResourceQuotaControllerOptions{
    75. ResourceQuotaSyncPeriod: componentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod,
    76. ConcurrentResourceQuotaSyncs: componentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs,
    77. },
    78. SAController: &SAControllerOptions{
    79. ConcurrentSATokenSyncs: componentConfig.SAController.ConcurrentSATokenSyncs,
    80. },
    81. ServiceController: &cmoptions.ServiceControllerOptions{
    82. ConcurrentServiceSyncs: componentConfig.ServiceController.ConcurrentServiceSyncs,
    83. },
    84. TTLAfterFinishedController: &TTLAfterFinishedControllerOptions{
    85. ConcurrentTTLSyncs: componentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs,
    86. },
    87. SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
    88. InsecureServing: (&apiserveroptions.DeprecatedInsecureServingOptions{
    89. BindAddress: net.ParseIP(componentConfig.Generic.Address),
    90. BindPort: int(componentConfig.Generic.Port),
    91. BindNetwork: "tcp",
    92. }).WithLoopback(),
    93. Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
    94. Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
    95. }
    96. s.Authentication.RemoteKubeConfigFileOptional = true
    97. s.Authorization.RemoteKubeConfigFileOptional = true
    98. s.Authorization.AlwaysAllowPaths = []string{"/healthz"}
    99. s.SecureServing.ServerCert.CertDirectory = "/var/run/kubernetes"
    100. s.SecureServing.ServerCert.PairName = "kube-controller-manager"
    101. s.SecureServing.BindPort = ports.KubeControllerManagerPort
    102. gcIgnoredResources := make([]kubectrlmgrconfig.GroupResource, 0, len(garbagecollector.DefaultIgnoredResources()))
    103. for r := range garbagecollector.DefaultIgnoredResources() {
    104. gcIgnoredResources = append(gcIgnoredResources, kubectrlmgrconfig.GroupResource{Group: r.Group, Resource: r.Resource})
    105. }
    106. s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources
    107. return &s, nil
    108. }

    2.2. AddFlagSet

    添加参数及帮助函数。

    1. fs := cmd.Flags()
    2. namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
    3. for _, f := range namedFlagSets.FlagSets {
    4. fs.AddFlagSet(f)
    5. }
    6. usageFmt := "Usage:\n %s\n"
    7. cols, _, _ := apiserverflag.TerminalSize(cmd.OutOrStdout())
    8. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
    9. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
    10. apiserverflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
    11. return nil
    12. })
    13. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
    14. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
    15. apiserverflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    16. })

    3. Run

    此部分的代码位于cmd/kube-controller-manager/app/controllermanager.go

    基于KubeControllerManagerOptions运行controllerManager,不退出。

    1. // Run runs the KubeControllerManagerOptions. This should never exit.
    2. func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
    3. ...
    4. run := func(ctx context.Context) {
    5. ...
    6. controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
    7. if err != nil {
    8. glog.Fatalf("error building controller context: %v", err)
    9. }
    10. saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
    11. if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
    12. glog.Fatalf("error starting controllers: %v", err)
    13. }
    14. controllerContext.InformerFactory.Start(controllerContext.Stop)
    15. close(controllerContext.InformersStarted)
    16. select {}
    17. }
    18. ...
    19. }

    Run函数涉及的核心代码如下:

    1. // 创建controller的context
    2. controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
    3. // 启动各种controller
    4. err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux)

    其中StartControllers中的入参NewControllerInitializers初始化了各种controller。

    3.1. CreateControllerContext

    CreateControllerContext构建了各种controller所需的资源的上下文,各种controller在启动时,入参为该context,具体参考initFn(ctx)

    1. // CreateControllerContext creates a context struct containing references to resources needed by the
    2. // controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
    3. // the shared-informers client and token controller.
    4. func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
    5. versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
    6. sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
    7. // If apiserver is not running we should wait for some time and fail only then. This is particularly
    8. // important when we start apiserver and controller manager at the same time.
    9. if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
    10. return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
    11. }
    12. // Use a discovery client capable of being refreshed.
    13. discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
    14. cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
    15. restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
    16. go wait.Until(func() {
    17. restMapper.Reset()
    18. }, 30*time.Second, stop)
    19. availableResources, err := GetAvailableResources(rootClientBuilder)
    20. if err != nil {
    21. return ControllerContext{}, err
    22. }
    23. cloud, loopMode, err := createCloudProvider(s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
    24. s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
    25. if err != nil {
    26. return ControllerContext{}, err
    27. }
    28. ctx := ControllerContext{
    29. ClientBuilder: clientBuilder,
    30. InformerFactory: sharedInformers,
    31. ComponentConfig: s.ComponentConfig,
    32. RESTMapper: restMapper,
    33. AvailableResources: availableResources,
    34. Cloud: cloud,
    35. LoopMode: loopMode,
    36. Stop: stop,
    37. InformersStarted: make(chan struct{}),
    38. ResyncPeriod: ResyncPeriod(s),
    39. }
    40. return ctx, nil
    41. }

    核心代码为NewSharedInformerFactory

    1. // 创建SharedInformerFactory
    2. sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
    3. // 赋值给ControllerContext
    4. ctx := ControllerContext{
    5. InformerFactory: sharedInformers,
    6. }

    SharedInformerFactory提供了公共的k8s对象的informers

    1. // SharedInformerFactory provides shared informers for resources in all known
    2. // API group versions.
    3. type SharedInformerFactory interface {
    4. internalinterfaces.SharedInformerFactory
    5. ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    6. WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
    7. Admissionregistration() admissionregistration.Interface
    8. Apps() apps.Interface
    9. Autoscaling() autoscaling.Interface
    10. Batch() batch.Interface
    11. Certificates() certificates.Interface
    12. Coordination() coordination.Interface
    13. Core() core.Interface
    14. Events() events.Interface
    15. Extensions() extensions.Interface
    16. Networking() networking.Interface
    17. Policy() policy.Interface
    18. Rbac() rbac.Interface
    19. Scheduling() scheduling.Interface
    20. Settings() settings.Interface
    21. Storage() storage.Interface
    22. }

    3.2. NewControllerInitializers

    NewControllerInitializers定义了各种controller的类型和其对于的启动函数,例如deployment``、statefulsetreplicasetreplicationcontrollernamespace等。

    1. // NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
    2. // paired to their InitFunc. This allows for structured downstream composition and subdivision.
    3. func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    4. controllers := map[string]InitFunc{}
    5. controllers["endpoint"] = startEndpointController
    6. controllers["replicationcontroller"] = startReplicationController
    7. controllers["podgc"] = startPodGCController
    8. controllers["resourcequota"] = startResourceQuotaController
    9. controllers["namespace"] = startNamespaceController
    10. controllers["serviceaccount"] = startServiceAccountController
    11. controllers["garbagecollector"] = startGarbageCollectorController
    12. controllers["daemonset"] = startDaemonSetController
    13. controllers["job"] = startJobController
    14. controllers["deployment"] = startDeploymentController
    15. controllers["replicaset"] = startReplicaSetController
    16. controllers["horizontalpodautoscaling"] = startHPAController
    17. controllers["disruption"] = startDisruptionController
    18. controllers["statefulset"] = startStatefulSetController
    19. controllers["cronjob"] = startCronJobController
    20. controllers["csrsigning"] = startCSRSigningController
    21. controllers["csrapproving"] = startCSRApprovingController
    22. controllers["csrcleaner"] = startCSRCleanerController
    23. controllers["ttl"] = startTTLController
    24. controllers["bootstrapsigner"] = startBootstrapSignerController
    25. controllers["tokencleaner"] = startTokenCleanerController
    26. controllers["nodeipam"] = startNodeIpamController
    27. if loopMode == IncludeCloudLoops {
    28. controllers["service"] = startServiceController
    29. controllers["route"] = startRouteController
    30. // TODO: volume controller into the IncludeCloudLoops only set.
    31. // TODO: Separate cluster in cloud check from node lifecycle controller.
    32. }
    33. controllers["nodelifecycle"] = startNodeLifecycleController
    34. controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    35. controllers["attachdetach"] = startAttachDetachController
    36. controllers["persistentvolume-expander"] = startVolumeExpandController
    37. controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    38. controllers["pvc-protection"] = startPVCProtectionController
    39. controllers["pv-protection"] = startPVProtectionController
    40. controllers["ttl-after-finished"] = startTTLAfterFinishedController
    41. return controllers
    42. }

    3.3. StartControllers

    1. func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
    2. ...
    3. for controllerName, initFn := range controllers {
    4. if !ctx.IsControllerEnabled(controllerName) {
    5. glog.Warningf("%q is disabled", controllerName)
    6. continue
    7. }
    8. time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
    9. glog.V(1).Infof("Starting %q", controllerName)
    10. debugHandler, started, err := initFn(ctx)
    11. if err != nil {
    12. glog.Errorf("Error starting %q", controllerName)
    13. return err
    14. }
    15. if !started {
    16. glog.Warningf("Skipping %q", controllerName)
    17. continue
    18. }
    19. if debugHandler != nil && unsecuredMux != nil {
    20. basePath := "/debug/controllers/" + controllerName
    21. unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
    22. unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
    23. }
    24. glog.Infof("Started %q", controllerName)
    25. }
    26. return nil
    27. }

    核心代码:

    1. for controllerName, initFn := range controllers {
    2. debugHandler, started, err := initFn(ctx)
    3. }

    启动各种controller,controller的启动函数在NewControllerInitializers中定义了,例如:

    1. // deployment
    2. controllers["deployment"] = startDeploymentController
    3. // statefulset
    4. controllers["statefulset"] = startStatefulSetController

    3.4. InformerFactory.Start

    InformerFactory实际上是SharedInformerFactory,具体的实现逻辑在client-go中的informer的实现机制。

    1. controllerContext.InformerFactory.Start(controllerContext.Stop)
    2. close(controllerContext.InformersStarted)

    3.4.1. SharedInformerFactory

    SharedInformerFactory是一个informer工厂的接口定义。

    1. // SharedInformerFactory a small interface to allow for adding an informer without an import cycle
    2. type SharedInformerFactory interface {
    3. Start(stopCh <-chan struct{})
    4. InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
    5. }

    3.4.2. sharedInformerFactory.Start

    Start方法初始化各种类型的informer

    1. // Start initializes all requested informers.
    2. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    3. f.lock.Lock()
    4. defer f.lock.Unlock()
    5. for informerType, informer := range f.informers {
    6. if !f.startedInformers[informerType] {
    7. go informer.Run(stopCh)
    8. f.startedInformers[informerType] = true
    9. }
    10. }
    11. }

    3.4.3. sharedIndexInformer.Run

    sharedIndexInformer.Run具体运行了sharedIndexInformer的实现逻辑,该部分待后续对informer机制做专题分析。

    1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    2. defer utilruntime.HandleCrash()
    3. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
    4. cfg := &Config{
    5. Queue: fifo,
    6. ListerWatcher: s.listerWatcher,
    7. ObjectType: s.objectType,
    8. FullResyncPeriod: s.resyncCheckPeriod,
    9. RetryOnError: false,
    10. ShouldResync: s.processor.shouldResync,
    11. Process: s.HandleDeltas,
    12. }
    13. func() {
    14. s.startedLock.Lock()
    15. defer s.startedLock.Unlock()
    16. s.controller = New(cfg)
    17. s.controller.(*controller).clock = s.clock
    18. s.started = true
    19. }()
    20. // Separate stop channel because Processor should be stopped strictly after controller
    21. processorStopCh := make(chan struct{})
    22. var wg wait.Group
    23. defer wg.Wait() // Wait for Processor to stop
    24. defer close(processorStopCh) // Tell Processor to stop
    25. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    26. wg.StartWithChannel(processorStopCh, s.processor.run)
    27. defer func() {
    28. s.startedLock.Lock()
    29. defer s.startedLock.Unlock()
    30. s.stopped = true // Don't want any new listeners
    31. }()
    32. s.controller.Run(stopCh)
    33. }

    4. initFn(ctx)

    initFn实际调用的就是各种类型的controller,代码位于kubernetes/cmd/kube-controller-manager/app/apps.go,本文以startStatefulSetControllerstartDeploymentController为例,controller中实际调用的函数逻辑位于kubernetes/pkg/controller中,待后续分析。

    4.1. startStatefulSetController

    1. func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
    2. if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {
    3. return nil, false, nil
    4. }
    5. go statefulset.NewStatefulSetController(
    6. ctx.InformerFactory.Core().V1().Pods(),
    7. ctx.InformerFactory.Apps().V1().StatefulSets(),
    8. ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
    9. ctx.InformerFactory.Apps().V1().ControllerRevisions(),
    10. ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
    11. ).Run(1, ctx.Stop)
    12. return nil, true, nil
    13. }

    其中使用到了InformerFactory,包含了Pods、StatefulSets、PersistentVolumeClaims、ControllerRevisions的informer。

    startStatefulSetController主要调用的函数为NewStatefulSetController和对应的Run函数。

    4.2. startDeploymentController

    1. func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    2. if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
    3. return nil, false, nil
    4. }
    5. dc, err := deployment.NewDeploymentController(
    6. ctx.InformerFactory.Apps().V1().Deployments(),
    7. ctx.InformerFactory.Apps().V1().ReplicaSets(),
    8. ctx.InformerFactory.Core().V1().Pods(),
    9. ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    10. )
    11. if err != nil {
    12. return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    13. }
    14. go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    15. return nil, true, nil
    16. }

    startDeploymentController主要调用的函数为NewDeploymentController和对应的Run函数。该部分逻辑在kubernetes/pkg/controller中。

    5. 总结

    1. Kube-controller-manager的代码风格仍然是Cobra命令行框架。通过构造ControllerManagerCommand,然后执行command.Execute()函数。基本的流程就是构造option,添加Flags,执行Run函数。
    2. cmd部分的调用流程如下:Main-->NewControllerManagerCommand--> Run(c.Complete(), wait.NeverStop)-->StartControllers-->initFn(ctx)-->startDeploymentController/startStatefulSetController-->sts.NewStatefulSetController.Run/dc.NewDeploymentController.Run-->pkg/controller
    3. 其中CreateControllerContext函数用来创建各类型controller所需要使用的context,NewControllerInitializers初始化了各种类型的controller,其中就包括DeploymentControllerStatefulSetController等。

    基本流程如下:

    1. 构造controller manager option,并转化为Config对象,执行Run函数。
    2. 基于Config对象创建ControllerContext,其中包含InformerFactory。
    3. 基于ControllerContext运行各种controller,各种controller的定义在NewControllerInitializers中。
    4. 执行InformerFactory.Start。
    5. 每种controller都会构造自身的结构体并执行对应的Run函数。

    参考:

    • https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kube-controller-manager
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-controller-manager/controller-manager.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-controller-manager/app/controllermanager.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-controller-manager/app/apps.go