• kubelet源码分析(三)之 startKubelet
  • 1. startKubelet
  • 2. Kubelet.Run
  • 3. initializeModules
    • 3.1. setupDataDirs
    • 3.2. manager
  • 4. 运行各种manager
    • 4.1. volumeManager
    • 4.2. syncNodeStatus
    • 4.3. updateRuntimeUp
    • 4.4. syncNetworkUtil
    • 4.5. podKiller
    • 4.6. statusManager
    • 4.7. probeManager
    • 4.8. runtimeClassManager
    • 4.9. PodLifecycleEventGenerator
    • 4.10. syncLoop
  • 5. syncLoop
  • 6. 总结
    • 6.1. 基本流程
    • 6.2. Manager

    kubelet源码分析(三)之 startKubelet

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析startKubelet,其中主要是kubelet.Run部分,该部分的内容主要是初始化并运行一些manager。对于kubelet所包含的各种manager的执行逻辑和pod的生命周期管理逻辑待后续文章分析。

    后续的文章主要会分类分析pkg/kubelet部分的代码实现。

    kubeletpkg代码目录结构:

    1. kubelet
    2. ├── apis # 定义一些相关接口
    3. ├── cadvisor # cadvisor
    4. ├── cm # ContainerManager、cpu manger、cgroup manager
    5. ├── config
    6. ├── configmap # configmap manager
    7. ├── container # Runtime、ImageService
    8. ├── dockershim # docker的相关调用
    9. ├── eviction # eviction manager
    10. ├── images # image manager
    11. ├── kubeletconfig
    12. ├── kuberuntime # 核心:kubeGenericRuntimeManager、runtime容器的相关操作
    13. ├── lifecycle
    14. ├── mountpod
    15. ├── network # pod dns
    16. ├── nodelease
    17. ├── nodestatus # MachineInfo、节点相关信息
    18. ├── pleg # PodLifecycleEventGenerator
    19. ├── pod # 核心:pod manager、mirror pod
    20. ├── preemption
    21. ├── qos # 资源服务质量,不过暂时内容很少
    22. ├── remote # RemoteRuntimeService
    23. ├── server
    24. ├── stats # StatsProvider
    25. ├── status # status manager
    26. ├── types # PodUpdate、PodOperation
    27. ├── volumemanager # VolumeManager
    28. ├── kubelet.go # 核心: SyncHandler、kubelet的大部分操作
    29. ├── kubelet_getters.go # 各种get操作,例如获取相关目录:getRootDir、getPodsDir、getPluginsDir
    30. ├── kubelet_network.go #
    31. ├── kubelet_network_linux.go
    32. ├── kubelet_node_status.go # registerWithAPIServer、initialNode、syncNodeStatus
    33. ├── kubelet_pods.go # 核心:pod的增删改查等相关操作、podKiller、
    34. ├── kubelet_resources.go
    35. ├── kubelet_volumes.go # ListVolumesForPod、cleanupOrphanedPodDirs
    36. ├── oom_watcher.go # OOMWatcher
    37. ├── pod_container_deletor.go
    38. ├── pod_workers.go # 核心:PodWorkers、UpdatePodOptions、syncPodOptions、managePodLoop
    39. ├── runonce.go # RunOnce
    40. ├── runtime.go
    41. ...

    1. startKubelet

    startKubelet的函数位于cmd/kubelet/app/server.go,启动并运行一个kubelet,运行kubelet的逻辑代码位于pkg/kubelet/kubelet.go

    主要内容如下:

    1. 运行一个kubelet,执行kubelet中各种manager的相关逻辑。
    2. 运行kubelet server启动监听服务。

    此部分代码位于cmd/kubelet/app/server.go

    1. func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
    2. // start the kubelet
    3. go wait.Until(func() {
    4. k.Run(podCfg.Updates())
    5. }, 0, wait.NeverStop)
    6. // start the kubelet server
    7. if enableServer {
    8. go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
    9. }
    10. if kubeCfg.ReadOnlyPort > 0 {
    11. go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
    12. }
    13. }

    2. Kubelet.Run

    Kubelet.Run方法主要将NewMainKubelet构造的各种manager运行起来,让各种manager执行相应的功能,大部分manager为常驻进程的方式运行。

    Kubelet.Run完整代码如下:

    此部分代码位于pkg/kubelet/kubelet.go

    1. // Run starts the kubelet reacting to config updates
    2. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    3. if kl.logServer == nil {
    4. kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    5. }
    6. if kl.kubeClient == nil {
    7. glog.Warning("No api server defined - no node status update will be sent.")
    8. }
    9. // Start the cloud provider sync manager
    10. if kl.cloudResourceSyncManager != nil {
    11. go kl.cloudResourceSyncManager.Run(wait.NeverStop)
    12. }
    13. if err := kl.initializeModules(); err != nil {
    14. kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
    15. glog.Fatal(err)
    16. }
    17. // Start volume manager
    18. go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
    19. if kl.kubeClient != nil {
    20. // Start syncing node status immediately, this may set up things the runtime needs to run.
    21. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
    22. go kl.fastStatusUpdateOnce()
    23. // start syncing lease
    24. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
    25. go kl.nodeLeaseController.Run(wait.NeverStop)
    26. }
    27. }
    28. go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
    29. // Start loop to sync iptables util rules
    30. if kl.makeIPTablesUtilChains {
    31. go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
    32. }
    33. // Start a goroutine responsible for killing pods (that are not properly
    34. // handled by pod workers).
    35. go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
    36. // Start component sync loops.
    37. kl.statusManager.Start()
    38. kl.probeManager.Start()
    39. // Start syncing RuntimeClasses if enabled.
    40. if kl.runtimeClassManager != nil {
    41. go kl.runtimeClassManager.Run(wait.NeverStop)
    42. }
    43. // Start the pod lifecycle event generator.
    44. kl.pleg.Start()
    45. kl.syncLoop(updates, kl)
    46. }

    以下对Kubelet.Run分段进行分析。

    3. initializeModules

    initializeModules包含了imageManagerserverCertificateManageroomWatcherresourceAnalyzer

    主要流程如下:

    1. 创建文件系统目录,包括kubelet的root目录、pods的目录、plugins的目录和容器日志目录。
    2. 启动imageManager、serverCertificateManager、oomWatcher、resourceAnalyzer。

    各种manager的说明如下:

    • imageManager:负责镜像垃圾回收。
    • serverCertificateManager:负责处理证书。
    • oomWatcher:监控内存使用,是否发生内存耗尽。
    • resourceAnalyzer:监控资源使用情况。

    完整代码如下:

    此部分代码位于pkg/kubelet/kubelet.go

    1. // initializeModules will initialize internal modules that do not require the container runtime to be up.
    2. // Note that the modules here must not depend on modules that are not initialized here.
    3. func (kl *Kubelet) initializeModules() error {
    4. // Prometheus metrics.
    5. metrics.Register(kl.runtimeCache, collectors.NewVolumeStatsCollector(kl))
    6. // Setup filesystem directories.
    7. if err := kl.setupDataDirs(); err != nil {
    8. return err
    9. }
    10. // If the container logs directory does not exist, create it.
    11. if _, err := os.Stat(ContainerLogsDir); err != nil {
    12. if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
    13. glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
    14. }
    15. }
    16. // Start the image manager.
    17. kl.imageManager.Start()
    18. // Start the certificate manager if it was enabled.
    19. if kl.serverCertificateManager != nil {
    20. kl.serverCertificateManager.Start()
    21. }
    22. // Start out of memory watcher.
    23. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
    24. return fmt.Errorf("Failed to start OOM watcher %v", err)
    25. }
    26. // Start resource analyzer
    27. kl.resourceAnalyzer.Start()
    28. return nil
    29. }

    3.1. setupDataDirs

    initializeModules先创建相关目录。

    具体目录如下:

    • ContainerLogsDir:目录为/var/log/containers。
    • rootDirectory:由参数传入,一般为/var/lib/kubelet
    • PodsDir:目录为{rootDirectory}/pods。
    • PluginsDir:目录为{rootDirectory}/plugins。

    initializeModules中setupDataDirs的相关代码如下:

    1. // Setup filesystem directories.
    2. if err := kl.setupDataDirs(); err != nil {
    3. return err
    4. }
    5. // If the container logs directory does not exist, create it.
    6. if _, err := os.Stat(ContainerLogsDir); err != nil {
    7. if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
    8. glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
    9. }
    10. }

    setupDataDirs代码如下

    1. // setupDataDirs creates:
    2. // 1. the root directory
    3. // 2. the pods directory
    4. // 3. the plugins directory
    5. func (kl *Kubelet) setupDataDirs() error {
    6. kl.rootDirectory = path.Clean(kl.rootDirectory)
    7. if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
    8. return fmt.Errorf("error creating root directory: %v", err)
    9. }
    10. if err := kl.mounter.MakeRShared(kl.getRootDir()); err != nil {
    11. return fmt.Errorf("error configuring root directory: %v", err)
    12. }
    13. if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
    14. return fmt.Errorf("error creating pods directory: %v", err)
    15. }
    16. if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
    17. return fmt.Errorf("error creating plugins directory: %v", err)
    18. }
    19. return nil
    20. }

    3.2. manager

    initializeModules中的manager如下:

    1. // Start the image manager.
    2. kl.imageManager.Start()
    3. // Start the certificate manager if it was enabled.
    4. if kl.serverCertificateManager != nil {
    5. kl.serverCertificateManager.Start()
    6. }
    7. // Start out of memory watcher.
    8. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
    9. return fmt.Errorf("Failed to start OOM watcher %v", err)
    10. }
    11. // Start resource analyzer
    12. kl.resourceAnalyzer.Start()

    4. 运行各种manager

    4.1. volumeManager

    volumeManager主要运行一组异步循环,根据在此节点上安排的pod调整哪些volume需要attached/detached/mounted/unmounted

    1. // Start volume manager
    2. go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

    volumeManager.Run实现代码如下:

    1. func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    2. defer runtime.HandleCrash()
    3. go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
    4. glog.V(2).Infof("The desired_state_of_world populator starts")
    5. glog.Infof("Starting Kubelet Volume Manager")
    6. go vm.reconciler.Run(stopCh)
    7. metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
    8. <-stopCh
    9. glog.Infof("Shutting down Kubelet Volume Manager")
    10. }

    4.2. syncNodeStatus

    syncNodeStatus通过goroutine的方式定期执行,它将节点的状态同步给master,必要的时候注册kubelet。

    1. if kl.kubeClient != nil {
    2. // Start syncing node status immediately, this may set up things the runtime needs to run.
    3. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
    4. go kl.fastStatusUpdateOnce()
    5. // start syncing lease
    6. if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
    7. go kl.nodeLeaseController.Run(wait.NeverStop)
    8. }
    9. }

    4.3. updateRuntimeUp

    updateRuntimeUp调用容器运行时状态回调,在容器运行时首次启动时初始化运行时相关模块,如果状态检查失败则返回错误。 如果状态检查正常,在kubelet runtimeState中更新容器运行时的正常运行时间。

    1. go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    4.4. syncNetworkUtil

    通过循环的方式同步iptables的规则,不过当前代码并没有执行任何操作。

    1. // Start loop to sync iptables util rules
    2. if kl.makeIPTablesUtilChains {
    3. go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
    4. }

    4.5. podKiller

    但pod没有被podworker正确处理的时候,启动一个goroutine负责杀死pod。

    1. // Start a goroutine responsible for killing pods (that are not properly
    2. // handled by pod workers).
    3. go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    podKiller代码如下:

    此部分代码位于pkg/kubelet/kubelet_pods.go

    1. // podKiller launches a goroutine to kill a pod received from the channel if
    2. // another goroutine isn't already in action.
    3. func (kl *Kubelet) podKiller() {
    4. killing := sets.NewString()
    5. // guard for the killing set
    6. lock := sync.Mutex{}
    7. for podPair := range kl.podKillingCh {
    8. runningPod := podPair.RunningPod
    9. apiPod := podPair.APIPod
    10. lock.Lock()
    11. exists := killing.Has(string(runningPod.ID))
    12. if !exists {
    13. killing.Insert(string(runningPod.ID))
    14. }
    15. lock.Unlock()
    16. if !exists {
    17. go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
    18. glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
    19. err := kl.killPod(apiPod, runningPod, nil, nil)
    20. if err != nil {
    21. glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
    22. }
    23. lock.Lock()
    24. killing.Delete(string(runningPod.ID))
    25. lock.Unlock()
    26. }(apiPod, runningPod)
    27. }
    28. }
    29. }

    4.6. statusManager

    使用apiserver同步pods状态; 也用作状态缓存。

    1. // Start component sync loops.
    2. kl.statusManager.Start()

    statusManager.Start的实现代码如下:

    1. func (m *manager) Start() {
    2. // Don't start the status manager if we don't have a client. This will happen
    3. // on the master, where the kubelet is responsible for bootstrapping the pods
    4. // of the master components.
    5. if m.kubeClient == nil {
    6. glog.Infof("Kubernetes client is nil, not starting status manager.")
    7. return
    8. }
    9. glog.Info("Starting to sync pod status with apiserver")
    10. syncTicker := time.Tick(syncPeriod)
    11. // syncPod and syncBatch share the same go routine to avoid sync races.
    12. go wait.Forever(func() {
    13. select {
    14. case syncRequest := <-m.podStatusChannel:
    15. glog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
    16. syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
    17. m.syncPod(syncRequest.podUID, syncRequest.status)
    18. case <-syncTicker:
    19. m.syncBatch()
    20. }
    21. }, 0)
    22. }

    4.7. probeManager

    处理容器探针

    1. kl.probeManager.Start()

    4.8. runtimeClassManager

    1. // Start syncing RuntimeClasses if enabled.
    2. if kl.runtimeClassManager != nil {
    3. go kl.runtimeClassManager.Run(wait.NeverStop)
    4. }

    4.9. PodLifecycleEventGenerator

    1. // Start the pod lifecycle event generator.
    2. kl.pleg.Start()

    PodLifecycleEventGenerator是一个pod生命周期时间生成器接口,具体如下:

    1. // PodLifecycleEventGenerator contains functions for generating pod life cycle events.
    2. type PodLifecycleEventGenerator interface {
    3. Start()
    4. Watch() chan *PodLifecycleEvent
    5. Healthy() (bool, error)
    6. }

    start方法具体实现如下:

    1. // Start spawns a goroutine to relist periodically.
    2. func (g *GenericPLEG) Start() {
    3. go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
    4. }

    4.10. syncLoop

    最后调用syncLoop来执行同步变化变更的循环。

    1. kl.syncLoop(updates, kl)

    5. syncLoop

    syncLoop是处理变化的循环。 它监听来自三种channel(file,apiserver和http)的更改。 对于看到的任何新更改,将针对所需状态和运行状态运行同步。 如果没有看到配置的变化,将在每个同步频率秒同步最后已知的所需状态。

    1. // syncLoop is the main loop for processing changes. It watches for changes from
    2. // three channels (file, apiserver, and http) and creates a union of them. For
    3. // any new change seen, will run a sync against desired state and running state. If
    4. // no changes are seen to the configuration, will synchronize the last known desired
    5. // state every sync-frequency seconds. Never returns.
    6. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    7. glog.Info("Starting kubelet main sync loop.")
    8. // The resyncTicker wakes up kubelet to checks if there are any pod workers
    9. // that need to be sync'd. A one-second period is sufficient because the
    10. // sync interval is defaulted to 10s.
    11. syncTicker := time.NewTicker(time.Second)
    12. defer syncTicker.Stop()
    13. housekeepingTicker := time.NewTicker(housekeepingPeriod)
    14. defer housekeepingTicker.Stop()
    15. plegCh := kl.pleg.Watch()
    16. const (
    17. base = 100 * time.Millisecond
    18. max = 5 * time.Second
    19. factor = 2
    20. )
    21. duration := base
    22. for {
    23. if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
    24. glog.Infof("skipping pod synchronization - %v", rs)
    25. // exponential backoff
    26. time.Sleep(duration)
    27. duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
    28. continue
    29. }
    30. // reset backoff if we have a success
    31. duration = base
    32. kl.syncLoopMonitor.Store(kl.clock.Now())
    33. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
    34. break
    35. }
    36. kl.syncLoopMonitor.Store(kl.clock.Now())
    37. }
    38. }

    其中调用了syncLoopIteration的函数来执行更具体的监控pod变化的循环。syncLoopIteration代码逻辑待后续单独分析。

    6. 总结

    6.1. 基本流程

    Kubelet.Run主要流程如下:

    1. 初始化模块,其实就是运行imageManagerserverCertificateManageroomWatcherresourceAnalyzer
    2. 运行各种manager,大部分以常驻goroutine的方式运行,其中包括volumeManagerstatusManager等。
    3. 执行处理变更的循环函数syncLoop,对pod的生命周期进行管理。

    syncLoop:

    syncLoop函数,对pod的生命周期进行管理,其中syncLoop调用了syncLoopIteration函数,该函数根据podUpdate的信息,针对不同的操作,由SyncHandler来执行pod的增删改查等生命周期的管理,其中的syncHandler包括HandlePodSyncsHandlePodCleanups等。该部分逻辑待后续文章具体分析。

    6.2. Manager

    以下介绍kubelet运行时涉及到的manager的内容。

    manager 说明
    imageManager 负责镜像垃圾回收
    serverCertificateManager 负责处理证书
    oomWatcher 监控内存使用,是否发生内存耗尽即OOM
    resourceAnalyzer 监控资源使用情况
    volumeManager 对pod执行attached/detached/mounted/unmounted操作
    statusManager 使用apiserver同步pods状态; 也用作状态缓存
    probeManager 处理容器探针
    runtimeClassManager 同步RuntimeClasses
    podKiller 负责杀死pod

    参考文章:

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kubelet/app/server.go

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go