• kubelet源码分析(一)之 NewKubeletCommand
  • 1. Main 函数
  • 2. NewKubeletCommand
    • 2.1. 参数解析
      • 2.1.1. 初始化参数和配置
      • 2.1.2. 打印帮助信息和版本信息
      • 2.1.3. kubelet config
      • 2.1.4. dynamic kubelet config
    • 2.2. 初始化kubeletServer和kubeletDeps
      • 2.2.1. kubeletServer
      • 2.2.2. kubeletDeps
      • 2.2.3. docker shim
    • 2.3. AddFlags
    • 2.4. 运行kubelet
  • 3. Run
    • 3.1. 构造kubeDeps
      • 3.1.1. clientConfig
      • 3.1.2. kubeClient
      • 3.1.3. dynamicKubeClient
      • 3.1.4. eventClient
      • 3.1.5. heartbeatClient
      • 3.1.6. csiClient
      • 3.1.7. CAdvisorInterface
      • 3.1.8. ContainerManager
      • 3.1.9. oomAdjuster
    • 3.2. Health check
    • 3.3. RunKubelet
  • 4. RunKubelet
    • 4.1. CreateAndInitKubelet
      • 4.1.1. NewMainKubelet
      • 4.1.2. PodConfig
    • 4.2. startKubelet
      • 4.2.1. k.Run
  • 5. 总结

    kubelet源码分析(一)之 NewKubeletCommand

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析 https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kubelet 部分的代码。

    本文主要分析 kubernetes/cmd/kubelet部分,该部分主要涉及kubelet的参数解析,及初始化和构造相关的依赖组件(主要在kubeDeps结构体中),并没有kubelet运行的详细逻辑,该部分位于kubernetes/pkg/kubelet模块,待后续文章分析。

    kubeletcmd代码目录结构如下:

    1. kubelet
    2. ├── app
    3. ├── auth.go
    4. ├── init_others.go
    5. ├── init_windows.go
    6. ├── options # 包括kubelet使用到的option
    7. ├── container_runtime.go
    8. ├── globalflags.go
    9. ├── globalflags_linux.go
    10. ├── globalflags_other.go
    11. ├── options.go # 包括KubeletFlags、AddFlags、AddKubeletConfigFlags等
    12. ├── osflags_others.go
    13. └── osflags_windows.go
    14. ├── plugins.go
    15. ├── server.go # 包括NewKubeletCommand、Run、RunKubelet、CreateAndInitKubelet、startKubelet等
    16. ├── server_linux.go
    17. └── server_unsupported.go
    18. └── kubelet.go # kubelet的main入口函数

    1. Main 函数

    kubelet的入口函数Main 函数,具体代码参考:https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kubelet/kubelet.go。

    1. func main() {
    2. rand.Seed(time.Now().UTC().UnixNano())
    3. command := app.NewKubeletCommand(server.SetupSignalHandler())
    4. logs.InitLogs()
    5. defer logs.FlushLogs()
    6. if err := command.Execute(); err != nil {
    7. fmt.Fprintf(os.Stderr, "%v\n", err)
    8. os.Exit(1)
    9. }
    10. }

    kubelet代码主要采用了Cobra命令行框架,核心代码如下:

    1. // 初始化命令行
    2. command := app.NewKubeletCommand(server.SetupSignalHandler())
    3. // 执行Execute
    4. err := command.Execute()

    2. NewKubeletCommand

    NewKubeletCommand基于参数创建了一个*cobra.Command对象。其中核心部分代码为参数解析部分和Run函数。

    1. // NewKubeletCommand creates a *cobra.Command object with default parameters
    2. func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
    3. ...
    4. cmd := &cobra.Command{
    5. Use: componentKubelet,
    6. Long: `...`,
    7. // The Kubelet has special flag parsing requirements to enforce flag precedence rules,
    8. // so we do all our parsing manually in Run, below.
    9. // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
    10. // `args` arg to Run, without Cobra's interference.
    11. DisableFlagParsing: true,
    12. Run: func(cmd *cobra.Command, args []string) {
    13. ...
    14. // run the kubelet
    15. glog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
    16. if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
    17. glog.Fatal(err)
    18. }
    19. },
    20. }
    21. ...
    22. return cmd
    23. }

    2.1. 参数解析

    kubelet开启了DisableFlagParsing参数,没有使用Cobra框架中的默认参数解析,而是自定义参数解析。

    2.1.1. 初始化参数和配置

    初始化参数解析,初始化cleanFlagSetkubeletFlagskubeletConfig

    1. cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
    2. cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc)
    3. kubeletFlags := options.NewKubeletFlags()
    4. kubeletConfig, err := options.NewKubeletConfiguration()

    2.1.2. 打印帮助信息和版本信息

    如果输入非法参数则打印使用帮助信息。

    1. // initial flag parse, since we disable cobra's flag parsing
    2. if err := cleanFlagSet.Parse(args); err != nil {
    3. cmd.Usage()
    4. glog.Fatal(err)
    5. }
    6. // check if there are non-flag arguments in the command line
    7. cmds := cleanFlagSet.Args()
    8. if len(cmds) > 0 {
    9. cmd.Usage()
    10. glog.Fatalf("unknown command: %s", cmds[0])
    11. }

    遇到helpversion参数则打印相关内容并退出。

    1. // short-circuit on help
    2. help, err := cleanFlagSet.GetBool("help")
    3. if err != nil {
    4. glog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
    5. }
    6. if help {
    7. cmd.Help()
    8. return
    9. }
    10. // short-circuit on verflag
    11. verflag.PrintAndExitIfRequested()
    12. utilflag.PrintFlags(cleanFlagSet)

    2.1.3. kubelet config

    加载并校验kubelet config。其中包括校验初始化的kubeletFlags,并从kubeletFlagsKubeletConfigFile参数获取kubelet config的内容。

    1. // set feature gates from initial flags-based config
    2. if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
    3. glog.Fatal(err)
    4. }
    5. // validate the initial KubeletFlags
    6. if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
    7. glog.Fatal(err)
    8. }
    9. if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
    10. glog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
    11. }
    12. // load kubelet config file, if provided
    13. if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
    14. kubeletConfig, err = loadConfigFile(configFile)
    15. if err != nil {
    16. glog.Fatal(err)
    17. }
    18. // We must enforce flag precedence by re-parsing the command line into the new object.
    19. // This is necessary to preserve backwards-compatibility across binary upgrades.
    20. // See issue #56171 for more details.
    21. if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
    22. glog.Fatal(err)
    23. }
    24. // update feature gates based on new config
    25. if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
    26. glog.Fatal(err)
    27. }
    28. }
    29. // We always validate the local configuration (command line + config file).
    30. // This is the default "last-known-good" config for dynamic config, and must always remain valid.
    31. if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
    32. glog.Fatal(err)
    33. }

    2.1.4. dynamic kubelet config

    如果开启使用动态kubelet的配置,则由动态配置文件替换kubelet配置文件。

    1. // use dynamic kubelet config, if enabled
    2. var kubeletConfigController *dynamickubeletconfig.Controller
    3. if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
    4. var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
    5. dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
    6. func(kc *kubeletconfiginternal.KubeletConfiguration) error {
    7. // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
    8. // so that we get a complete validation at the same point where we can decide to reject dynamic config.
    9. // This fixes the flag-precedence component of issue #63305.
    10. // See issue #56171 for general details on flag precedence.
    11. return kubeletConfigFlagPrecedence(kc, args)
    12. })
    13. if err != nil {
    14. glog.Fatal(err)
    15. }
    16. // If we should just use our existing, local config, the controller will return a nil config
    17. if dynamicKubeletConfig != nil {
    18. kubeletConfig = dynamicKubeletConfig
    19. // Note: flag precedence was already enforced in the controller, prior to validation,
    20. // by our above transform function. Now we simply update feature gates from the new config.
    21. if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
    22. glog.Fatal(err)
    23. }
    24. }
    25. }

    总结:以上通过对各种特定参数的解析,最终生成kubeletFlagskubeletConfig两个重要的参数对象,用来构造kubeletServer和其他需求。

    2.2. 初始化kubeletServer和kubeletDeps

    2.2.1. kubeletServer

    1. // construct a KubeletServer from kubeletFlags and kubeletConfig
    2. kubeletServer := &options.KubeletServer{
    3. KubeletFlags: *kubeletFlags,
    4. KubeletConfiguration: *kubeletConfig,
    5. }

    2.2.2. kubeletDeps

    1. // use kubeletServer to construct the default KubeletDeps
    2. kubeletDeps, err := UnsecuredDependencies(kubeletServer)
    3. if err != nil {
    4. glog.Fatal(err)
    5. }
    6. // add the kubelet config controller to kubeletDeps
    7. kubeletDeps.KubeletConfigController = kubeletConfigController

    2.2.3. docker shim

    如果开启了docker shim参数,则执行RunDockershim

    1. // start the experimental docker shim, if enabled
    2. if kubeletServer.KubeletFlags.ExperimentalDockershim {
    3. if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
    4. glog.Fatal(err)
    5. }
    6. return
    7. }

    2.3. AddFlags

    1. // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
    2. kubeletFlags.AddFlags(cleanFlagSet)
    3. options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
    4. options.AddGlobalFlags(cleanFlagSet)
    5. cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
    6. // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
    7. const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
    8. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
    9. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
    10. return nil
    11. })
    12. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
    13. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
    14. })

    其中:

    • AddFlags代码可参考:kubernetes/cmd/kubelet/app/options/options.go#L323
    • AddKubeletConfigFlags代码可参考:kubernetes/cmd/kubelet/app/options/options.go#L424

    2.4. 运行kubelet

    运行kubelet并且不退出。由Run函数进入后续的操作。

    1. // run the kubelet
    2. glog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
    3. if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
    4. glog.Fatal(err)
    5. }

    3. Run

    1. // Run runs the specified KubeletServer with the given Dependencies. This should never exit.
    2. // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
    3. // Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
    4. // not be generated.
    5. func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
    6. // To help debugging, immediately log version
    7. glog.Infof("Version: %+v", version.Get())
    8. if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
    9. return fmt.Errorf("failed OS init: %v", err)
    10. }
    11. if err := run(s, kubeDeps, stopCh); err != nil {
    12. return fmt.Errorf("failed to run Kubelet: %v", err)
    13. }
    14. return nil
    15. }

    当运行环境是Windows的时候,初始化操作,但是该操作为空,只是预留。具体执行run(s, kubeDeps, stopCh)函数。

    3.1. 构造kubeDeps

    3.1.1. clientConfig

    创建clientConfig,该对象用来创建各种的kubeDeps属性中包含的client

    1. clientConfig, err := createAPIServerClientConfig(s)
    2. if err != nil {
    3. return fmt.Errorf("invalid kubeconfig: %v", err)
    4. }

    3.1.2. kubeClient

    1. kubeClient, err = clientset.NewForConfig(clientConfig)
    2. if err != nil {
    3. glog.Warningf("New kubeClient from clientConfig error: %v", err)
    4. } else if kubeClient.CertificatesV1beta1() != nil && clientCertificateManager != nil {
    5. glog.V(2).Info("Starting client certificate rotation.")
    6. clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests())
    7. clientCertificateManager.Start()
    8. }

    3.1.3. dynamicKubeClient

    1. dynamicKubeClient, err = dynamic.NewForConfig(clientConfig)
    2. if err != nil {
    3. glog.Warningf("Failed to initialize dynamic KubeClient: %v", err)
    4. }

    3.1.4. eventClient

    1. // make a separate client for events
    2. eventClientConfig := *clientConfig
    3. eventClientConfig.QPS = float32(s.EventRecordQPS)
    4. eventClientConfig.Burst = int(s.EventBurst)
    5. eventClient, err = v1core.NewForConfig(&eventClientConfig)
    6. if err != nil {
    7. glog.Warningf("Failed to create API Server client for Events: %v", err)
    8. }

    3.1.5. heartbeatClient

    1. // make a separate client for heartbeat with throttling disabled and a timeout attached
    2. heartbeatClientConfig := *clientConfig
    3. heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
    4. // if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency
    5. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
    6. leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
    7. if heartbeatClientConfig.Timeout > leaseTimeout {
    8. heartbeatClientConfig.Timeout = leaseTimeout
    9. }
    10. }
    11. heartbeatClientConfig.QPS = float32(-1)
    12. heartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
    13. if err != nil {
    14. glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
    15. }

    3.1.6. csiClient

    1. // csiClient works with CRDs that support json only
    2. clientConfig.ContentType = "application/json"
    3. csiClient, err := csiclientset.NewForConfig(clientConfig)
    4. if err != nil {
    5. glog.Warningf("Failed to create CSI API client: %v", err)
    6. }

    client赋值

    1. kubeDeps.KubeClient = kubeClient
    2. kubeDeps.DynamicKubeClient = dynamicKubeClient
    3. if heartbeatClient != nil {
    4. kubeDeps.HeartbeatClient = heartbeatClient
    5. kubeDeps.OnHeartbeatFailure = closeAllConns
    6. }
    7. if eventClient != nil {
    8. kubeDeps.EventClient = eventClient
    9. }
    10. kubeDeps.CSIClient = csiClient

    3.1.7. CAdvisorInterface

    1. if kubeDeps.CAdvisorInterface == nil {
    2. imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
    3. kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
    4. if err != nil {
    5. return err
    6. }
    7. }

    3.1.8. ContainerManager

    1. if kubeDeps.ContainerManager == nil {
    2. if s.CgroupsPerQOS && s.CgroupRoot == "" {
    3. glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
    4. s.CgroupRoot = "/"
    5. }
    6. kubeReserved, err := parseResourceList(s.KubeReserved)
    7. if err != nil {
    8. return err
    9. }
    10. systemReserved, err := parseResourceList(s.SystemReserved)
    11. if err != nil {
    12. return err
    13. }
    14. var hardEvictionThresholds []evictionapi.Threshold
    15. // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
    16. if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
    17. hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
    18. if err != nil {
    19. return err
    20. }
    21. }
    22. experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
    23. if err != nil {
    24. return err
    25. }
    26. devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
    27. kubeDeps.ContainerManager, err = cm.NewContainerManager(
    28. kubeDeps.Mounter,
    29. kubeDeps.CAdvisorInterface,
    30. cm.NodeConfig{
    31. RuntimeCgroupsName: s.RuntimeCgroups,
    32. SystemCgroupsName: s.SystemCgroups,
    33. KubeletCgroupsName: s.KubeletCgroups,
    34. ContainerRuntime: s.ContainerRuntime,
    35. CgroupsPerQOS: s.CgroupsPerQOS,
    36. CgroupRoot: s.CgroupRoot,
    37. CgroupDriver: s.CgroupDriver,
    38. KubeletRootDir: s.RootDirectory,
    39. ProtectKernelDefaults: s.ProtectKernelDefaults,
    40. NodeAllocatableConfig: cm.NodeAllocatableConfig{
    41. KubeReservedCgroupName: s.KubeReservedCgroup,
    42. SystemReservedCgroupName: s.SystemReservedCgroup,
    43. EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
    44. KubeReserved: kubeReserved,
    45. SystemReserved: systemReserved,
    46. HardEvictionThresholds: hardEvictionThresholds,
    47. },
    48. QOSReserved: *experimentalQOSReserved,
    49. ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
    50. ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
    51. ExperimentalPodPidsLimit: s.PodPidsLimit,
    52. EnforceCPULimits: s.CPUCFSQuota,
    53. CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
    54. },
    55. s.FailSwapOn,
    56. devicePluginEnabled,
    57. kubeDeps.Recorder)
    58. if err != nil {
    59. return err
    60. }
    61. }

    3.1.9. oomAdjuster

    1. // TODO(vmarmol): Do this through container config.
    2. oomAdjuster := kubeDeps.OOMAdjuster
    3. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
    4. glog.Warning(err)
    5. }

    3.2. Health check

    1. if s.HealthzPort > 0 {
    2. healthz.DefaultHealthz()
    3. go wait.Until(func() {
    4. err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
    5. if err != nil {
    6. glog.Errorf("Starting health server failed: %v", err)
    7. }
    8. }, 5*time.Second, wait.NeverStop)
    9. }

    3.3. RunKubelet

    通过各种赋值构造了完整的kubeDeps结构体,最后再执行RunKubelet转入后续的kubelet执行流程。

    1. if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
    2. return err
    3. }

    4. RunKubelet

    1. // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
    2. // 1 Integration tests
    3. // 2 Kubelet binary
    4. // 3 Standalone 'kubernetes' binary
    5. // Eventually, #2 will be replaced with instances of #3
    6. func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    7. ...
    8. k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration,
    9. ...
    10. kubeServer.NodeStatusMaxImages)
    11. if err != nil {
    12. return fmt.Errorf("failed to create kubelet: %v", err)
    13. }
    14. // NewMainKubelet should have set up a pod source config if one didn't exist
    15. // when the builder was run. This is just a precaution.
    16. if kubeDeps.PodConfig == nil {
    17. return fmt.Errorf("failed to create kubelet, pod source config was nil")
    18. }
    19. podCfg := kubeDeps.PodConfig
    20. rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
    21. // process pods and exit.
    22. if runOnce {
    23. if _, err := k.RunOnce(podCfg.Updates()); err != nil {
    24. return fmt.Errorf("runonce failed: %v", err)
    25. }
    26. glog.Infof("Started kubelet as runonce")
    27. } else {
    28. startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
    29. glog.Infof("Started kubelet")
    30. }
    31. return nil
    32. }

    RunKubelet函数核心代码为执行了CreateAndInitKubeletstartKubelet两个函数的操作,以下对这两个函数进行分析。

    4.1. CreateAndInitKubelet

    通过传入kubeDeps调用CreateAndInitKubelet初始化Kubelet。

    1. k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration,
    2. kubeDeps,
    3. &kubeServer.ContainerRuntimeOptions,
    4. kubeServer.ContainerRuntime,
    5. kubeServer.RuntimeCgroups,
    6. kubeServer.HostnameOverride,
    7. kubeServer.NodeIP,
    8. kubeServer.ProviderID,
    9. kubeServer.CloudProvider,
    10. kubeServer.CertDirectory,
    11. kubeServer.RootDirectory,
    12. kubeServer.RegisterNode,
    13. kubeServer.RegisterWithTaints,
    14. kubeServer.AllowedUnsafeSysctls,
    15. kubeServer.RemoteRuntimeEndpoint,
    16. kubeServer.RemoteImageEndpoint,
    17. kubeServer.ExperimentalMounterPath,
    18. kubeServer.ExperimentalKernelMemcgNotification,
    19. kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
    20. kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
    21. kubeServer.MinimumGCAge,
    22. kubeServer.MaxPerPodContainerCount,
    23. kubeServer.MaxContainerCount,
    24. kubeServer.MasterServiceNamespace,
    25. kubeServer.RegisterSchedulable,
    26. kubeServer.NonMasqueradeCIDR,
    27. kubeServer.KeepTerminatedPodVolumes,
    28. kubeServer.NodeLabels,
    29. kubeServer.SeccompProfileRoot,
    30. kubeServer.BootstrapCheckpointPath,
    31. kubeServer.NodeStatusMaxImages)
    32. if err != nil {
    33. return fmt.Errorf("failed to create kubelet: %v", err)
    34. }

    4.1.1. NewMainKubelet

    CreateAndInitKubelet方法中执行的核心函数是NewMainKubeletNewMainKubelet实例化一个kubelet对象,该部分的具体代码在kubernetes/pkg/kubelet中,具体参考:kubernetes/pkg/kubelet/kubelet.go#L325。

    1. func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    2. ...
    3. nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
    4. // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
    5. // up into "per source" synchronizations
    6. k, err = kubelet.NewMainKubelet(kubeCfg,
    7. kubeDeps,
    8. crOptions,
    9. containerRuntime,
    10. runtimeCgroups,
    11. hostnameOverride,
    12. nodeIP,
    13. providerID,
    14. cloudProvider,
    15. certDirectory,
    16. rootDirectory,
    17. registerNode,
    18. registerWithTaints,
    19. allowedUnsafeSysctls,
    20. remoteRuntimeEndpoint,
    21. remoteImageEndpoint,
    22. experimentalMounterPath,
    23. experimentalKernelMemcgNotification,
    24. experimentalCheckNodeCapabilitiesBeforeMount,
    25. experimentalNodeAllocatableIgnoreEvictionThreshold,
    26. minimumGCAge,
    27. maxPerPodContainerCount,
    28. maxContainerCount,
    29. masterServiceNamespace,
    30. registerSchedulable,
    31. nonMasqueradeCIDR,
    32. keepTerminatedPodVolumes,
    33. nodeLabels,
    34. seccompProfileRoot,
    35. bootstrapCheckpointPath,
    36. nodeStatusMaxImages)
    37. if err != nil {
    38. return nil, err
    39. }
    40. k.BirthCry()
    41. k.StartGarbageCollection()
    42. return k, nil
    43. }

    4.1.2. PodConfig

    1. if kubeDeps.PodConfig == nil {
    2. var err error
    3. kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
    4. if err != nil {
    5. return nil, err
    6. }
    7. }

    NewMainKubelet-->PodConfig-->NewPodConfig-->kubetypes.PodUpdate。会生成一个podUpdate的channel来监听pod的变化,该channel会在k.Run(podCfg.Updates())中作为关键入参。

    4.2. startKubelet

    1. // process pods and exit.
    2. if runOnce {
    3. if _, err := k.RunOnce(podCfg.Updates()); err != nil {
    4. return fmt.Errorf("runonce failed: %v", err)
    5. }
    6. glog.Infof("Started kubelet as runonce")
    7. } else {
    8. startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
    9. glog.Infof("Started kubelet")
    10. }

    如果设置了只运行一次的参数,则执行k.RunOnce,否则执行核心函数startKubelet。具体实现如下:

    1. func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
    2. // start the kubelet
    3. go wait.Until(func() {
    4. k.Run(podCfg.Updates())
    5. }, 0, wait.NeverStop)
    6. // start the kubelet server
    7. if enableServer {
    8. go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
    9. }
    10. if kubeCfg.ReadOnlyPort > 0 {
    11. go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
    12. }
    13. }

    4.2.1. k.Run

    1. // start the kubelet
    2. go wait.Until(func() {
    3. k.Run(podCfg.Updates())
    4. }, 0, wait.NeverStop)

    通过长驻进程的方式运行k.Run,不退出,将kubelet的运行逻辑引入kubernetes/pkg/kubelet/kubelet.go部分,kubernetes/pkg/kubelet部分的运行逻辑待后续文章分析。

    5. 总结

    1. kubelet采用Cobra命令行框架和pflag参数解析框架,和apiserver、scheduler、controller-manager形成统一的代码风格。

    2. kubernetes/cmd/kubelet部分主要对运行参数进行定义和解析,初始化和构造相关的依赖组件(主要在kubeDeps结构体中),并没有kubelet运行的详细逻辑,该部分位于kubernetes/pkg/kubelet模块。

    3. cmd部分调用流程如下:Main-->NewKubeletCommand-->Run(kubeletServer, kubeletDeps, stopCh)-->run(s *options.KubeletServer, kubeDeps ..., stopCh ...)--> RunKubelet(s, kubeDeps, s.RunOnce)-->startKubelet-->k.Run(podCfg.Updates())-->pkg/kubelet

      同时RunKubelet(s, kubeDeps, s.RunOnce)-->CreateAndInitKubelet-->kubelet.NewMainKubelet-->pkg/kubelet

    参考文章:

    • https://github.com/kubernetes/kubernetes/tree/v1.12.0