• kubelet源码分析(五)之 syncPod
  • 1. managePodLoop
  • 2. syncPod
    • 2.1. SyncPodKill
    • 2.2. SyncPodCreate
    • 2.3. Cgroups
    • 2.4. Mirror Pod
    • 2.5. makePodDataDirs
    • 2.6. mount volumes
    • 2.7. PullSecretsForPod
    • 2.8. containerRuntime.SyncPod
  • 3. Runtime.SyncPod
    • 3.1. computePodActions
    • 3.2. killPodWithSyncResult
    • 3.3. killContainer
    • 3.4. createPodSandbox
    • 3.5. start init container
    • 3.6. start containers
  • 4. startContainer
    • 4.1. pull image
    • 4.2. CreateContainer
    • 4.3. StartContainer
    • 4.4. execute post start hook
  • 5. 总结

    kubelet源码分析(五)之 syncPod

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析kubeletsyncPod的部分。

    1. managePodLoop

    managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPod

    managePodLoop完整代码如下:

    此部分代码位于pkg/kubelet/pod_workers.go

    1. func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    2. var lastSyncTime time.Time
    3. for update := range podUpdates {
    4. err := func() error {
    5. podUID := update.Pod.UID
    6. // This is a blocking call that would return only if the cache
    7. // has an entry for the pod that is newer than minRuntimeCache
    8. // Time. This ensures the worker doesn't start syncing until
    9. // after the cache is at least newer than the finished time of
    10. // the previous sync.
    11. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
    12. if err != nil {
    13. // This is the legacy event thrown by manage pod loop
    14. // all other events are now dispatched from syncPodFn
    15. p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
    16. return err
    17. }
    18. // 该部分的syncPodFn实际上的实现函数是kubelet.syncPod
    19. err = p.syncPodFn(syncPodOptions{
    20. mirrorPod: update.MirrorPod,
    21. pod: update.Pod,
    22. podStatus: status,
    23. killPodOptions: update.KillPodOptions,
    24. updateType: update.UpdateType,
    25. })
    26. lastSyncTime = time.Now()
    27. return err
    28. }()
    29. // notify the call-back function if the operation succeeded or not
    30. if update.OnCompleteFunc != nil {
    31. update.OnCompleteFunc(err)
    32. }
    33. if err != nil {
    34. // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
    35. glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
    36. }
    37. p.wrapUp(update.Pod.UID, err)
    38. }
    39. }

    以下分析syncPod相关逻辑。

    2. syncPod

    syncPod可以理解为是一个单个pod进行同步任务的事务脚本。其中入参是syncPodOptionssyncPodOptions记录了需要同步的pod的相关信息。具体定义如下:

    1. // syncPodOptions provides the arguments to a SyncPod operation.
    2. type syncPodOptions struct {
    3. // the mirror pod for the pod to sync, if it is a static pod
    4. mirrorPod *v1.Pod
    5. // pod to sync
    6. pod *v1.Pod
    7. // the type of update (create, update, sync)
    8. updateType kubetypes.SyncPodType
    9. // the current status
    10. podStatus *kubecontainer.PodStatus
    11. // if update type is kill, use the specified options to kill the pod.
    12. killPodOptions *KillPodOptions
    13. }

    syncPod主要执行以下的工作流:

    • 如果是正在创建的pod,则记录pod worker的启动latency
    • 调用generateAPIPodStatus为pod提供v1.PodStatus信息。
    • 如果pod是第一次运行,记录pod的启动latency
    • 更新status manager中的pod状态。
    • 如果pod不应该被运行则杀死pod。
    • 如果pod是一个static pod,并且没有对应的mirror pod,则创建一个mirror pod
    • 如果没有pod的数据目录则给pod创建对应的数据目录。
    • 等待volume被attach或mount。
    • 获取pod的secret数据。
    • 调用container runtimeSyncPod函数,执行相关pod操作。
    • 更新pod的ingressegresstraffic limit

    当以上任务流中有任何的error,则return error。在下一次执行syncPod的任务流会被再次执行。对于错误信息会被记录到event中,方便debug。

    以下对syncPod的执行过程进行分析。

    syncPod的代码位于pkg/kubelet/kubelet.go

    2.1. SyncPodKill

    首先,获取syncPodOptions的pod信息。

    1. func (kl *Kubelet) syncPod(o syncPodOptions) error {
    2. // pull out the required options
    3. pod := o.pod
    4. mirrorPod := o.mirrorPod
    5. podStatus := o.podStatus
    6. updateType := o.updateType
    7. ...
    8. }

    如果pod是需要被杀死的,则执行killPod,会在指定的宽限期内杀死pod。

    1. // if we want to kill a pod, do it now!
    2. if updateType == kubetypes.SyncPodKill {
    3. killPodOptions := o.killPodOptions
    4. if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
    5. return fmt.Errorf("kill pod options are required if update type is kill")
    6. }
    7. apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
    8. kl.statusManager.SetPodStatus(pod, apiPodStatus)
    9. // we kill the pod with the specified grace period since this is a termination
    10. if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
    11. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
    12. // there was an error killing the pod, so we return that error directly
    13. utilruntime.HandleError(err)
    14. return err
    15. }
    16. return nil
    17. }

    2.2. SyncPodCreate

    如果pod是需要被创建的,则记录pod的启动latencylatency与pod在apiserver中第一次被记录相关。

    1. // Latency measurements for the main workflow are relative to the
    2. // first time the pod was seen by the API server.
    3. var firstSeenTime time.Time
    4. if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
    5. firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
    6. }
    7. // Record pod worker start latency if being created
    8. // TODO: make pod workers record their own latencies
    9. if updateType == kubetypes.SyncPodCreate {
    10. if !firstSeenTime.IsZero() {
    11. // This is the first time we are syncing the pod. Record the latency
    12. // since kubelet first saw the pod if firstSeenTime is set.
    13. metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
    14. } else {
    15. glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
    16. }
    17. }

    通过pod和pod status生成最终的api pod status并设置pod的IP。

    1. // Generate final API pod status with pod and status manager status
    2. apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    3. // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
    4. // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
    5. // set pod IP to hostIP directly in runtime.GetPodStatus
    6. podStatus.IP = apiPodStatus.PodIP

    记录pod到running状态的时间。

    1. // Record the time it takes for the pod to become running.
    2. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    3. if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
    4. !firstSeenTime.IsZero() {
    5. metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
    6. }

    如果pod是不可运行的,则更新pod和container的状态和相应的原因。

    1. runnable := kl.canRunPod(pod)
    2. if !runnable.Admit {
    3. // Pod is not runnable; update the Pod and Container statuses to why.
    4. apiPodStatus.Reason = runnable.Reason
    5. apiPodStatus.Message = runnable.Message
    6. // Waiting containers are not creating.
    7. const waitingReason = "Blocked"
    8. for _, cs := range apiPodStatus.InitContainerStatuses {
    9. if cs.State.Waiting != nil {
    10. cs.State.Waiting.Reason = waitingReason
    11. }
    12. }
    13. for _, cs := range apiPodStatus.ContainerStatuses {
    14. if cs.State.Waiting != nil {
    15. cs.State.Waiting.Reason = waitingReason
    16. }
    17. }
    18. }

    并更新status manager中的状态信息,杀死不可运行的pod。

    1. // Update status in the status manager
    2. kl.statusManager.SetPodStatus(pod, apiPodStatus)
    3. // Kill pod if it should not be running
    4. if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
    5. var syncErr error
    6. if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
    7. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
    8. syncErr = fmt.Errorf("error killing pod: %v", err)
    9. utilruntime.HandleError(syncErr)
    10. } else {
    11. if !runnable.Admit {
    12. // There was no error killing the pod, but the pod cannot be run.
    13. // Return an error to signal that the sync loop should back off.
    14. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
    15. }
    16. }
    17. return syncErr
    18. }

    如果网络插件还没到Ready状态,则只有在使用host网络模式的情况下才启动pod。

    1. // If the network plugin is not ready, only start the pod if it uses the host network
    2. if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
    3. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs)
    4. return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs)
    5. }

    2.3. Cgroups

    给pod创建Cgroups,如果cgroups-per-qos参数开启,则申请相应的资源。对于terminated的pod不需要创建或更新pod的Cgroups

    当重新启动kubelet并且启用cgroups-per-qos时,应该间歇性地终止所有pod的运行容器并在qos cgroup hierarchy下重新启动。

    如果pod的cgroup已经存在或者pod第一次运行,不杀死pod中容器。

    1. // Create Cgroups for the pod and apply resource parameters
    2. // to them if cgroups-per-qos flag is enabled.
    3. pcm := kl.containerManager.NewPodContainerManager()
    4. // If pod has already been terminated then we need not create
    5. // or update the pod's cgroup
    6. if !kl.podIsTerminated(pod) {
    7. // When the kubelet is restarted with the cgroups-per-qos
    8. // flag enabled, all the pod's running containers
    9. // should be killed intermittently and brought back up
    10. // under the qos cgroup hierarchy.
    11. // Check if this is the pod's first sync
    12. firstSync := true
    13. for _, containerStatus := range apiPodStatus.ContainerStatuses {
    14. if containerStatus.State.Running != nil {
    15. firstSync = false
    16. break
    17. }
    18. }
    19. // Don't kill containers in pod if pod's cgroups already
    20. // exists or the pod is running for the first time
    21. podKilled := false
    22. if !pcm.Exists(pod) && !firstSync {
    23. if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
    24. podKilled = true
    25. }
    26. }
    27. ...

    如果pod被杀死并且重启策略是Never,则不创建或更新对应的Cgroups,否则创建和更新pod的Cgroups

    1. // Create and Update pod's Cgroups
    2. // Don't create cgroups for run once pod if it was killed above
    3. // The current policy is not to restart the run once pods when
    4. // the kubelet is restarted with the new flag as run once pods are
    5. // expected to run only once and if the kubelet is restarted then
    6. // they are not expected to run again.
    7. // We don't create and apply updates to cgroup if its a run once pod and was killed above
    8. if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
    9. if !pcm.Exists(pod) {
    10. if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
    11. glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
    12. }
    13. if err := pcm.EnsureExists(pod); err != nil {
    14. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
    15. return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
    16. }
    17. }
    18. }

    其中创建Cgroups是通过containerManagerUpdateQOSCgroups来执行。

    1. if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
    2. glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
    3. }

    2.4. Mirror Pod

    如果pod是一个static pod,没有对应的mirror pod,则创建一个mirror pod;如果存在mirror pod则删除再重建一个mirror pod

    1. // Create Mirror Pod for Static Pod if it doesn't already exist
    2. if kubepod.IsStaticPod(pod) {
    3. podFullName := kubecontainer.GetPodFullName(pod)
    4. deleted := false
    5. if mirrorPod != nil {
    6. if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
    7. // The mirror pod is semantically different from the static pod. Remove
    8. // it. The mirror pod will get recreated later.
    9. glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
    10. if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
    11. glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
    12. } else {
    13. deleted = true
    14. }
    15. }
    16. }
    17. if mirrorPod == nil || deleted {
    18. node, err := kl.GetNode()
    19. if err != nil || node.DeletionTimestamp != nil {
    20. glog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
    21. } else {
    22. glog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
    23. if err := kl.podManager.CreateMirrorPod(pod); err != nil {
    24. glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
    25. }
    26. }
    27. }
    28. }

    2.5. makePodDataDirs

    给pod创建数据目录。

    1. // Make data directories for the pod
    2. if err := kl.makePodDataDirs(pod); err != nil {
    3. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
    4. glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
    5. return err
    6. }

    其中数据目录包括

    • PodDir:{kubelet.rootDirectory}/pods/podUID
    • PodVolumesDir:{PodDir}/volumes
    • PodPluginsDir:{PodDir}/plugins
    1. // makePodDataDirs creates the dirs for the pod datas.
    2. func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error {
    3. uid := pod.UID
    4. if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
    5. return err
    6. }
    7. if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
    8. return err
    9. }
    10. if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
    11. return err
    12. }
    13. return nil
    14. }

    2.6. mount volumes

    对非terminated状态的pod挂载volume

    1. // Volume manager will not mount volumes for terminated pods
    2. if !kl.podIsTerminated(pod) {
    3. // Wait for volumes to attach/mount
    4. if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
    5. kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
    6. glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
    7. return err
    8. }
    9. }

    2.7. PullSecretsForPod

    获取pod的secret数据。

    1. // Fetch the pull secrets for the pod
    2. pullSecrets := kl.getPullSecretsForPod(pod)

    getPullSecretsForPod具体实现函数如下:

    1. // getPullSecretsForPod inspects the Pod and retrieves the referenced pull
    2. // secrets.
    3. func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
    4. pullSecrets := []v1.Secret{}
    5. for _, secretRef := range pod.Spec.ImagePullSecrets {
    6. secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name)
    7. if err != nil {
    8. glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)
    9. continue
    10. }
    11. pullSecrets = append(pullSecrets, *secret)
    12. }
    13. return pullSecrets
    14. }

    2.8. containerRuntime.SyncPod

    调用container runtimeSyncPod函数,执行相关pod操作,由此kubelet.syncPod的操作逻辑转入containerRuntime.SyncPod函数中。

    1. // Call the container runtime's SyncPod callback
    2. result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
    3. kl.reasonCache.Update(pod.UID, result)
    4. if err := result.Error(); err != nil {
    5. // Do not return error if the only failures were pods in backoff
    6. for _, r := range result.SyncResults {
    7. if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
    8. // Do not record an event here, as we keep all event logging for sync pod failures
    9. // local to container runtime so we get better errors
    10. return err
    11. }
    12. }
    13. return nil
    14. }

    3. Runtime.SyncPod

    SyncPod主要执行sync操作使得运行的pod达到期望状态的pod。主要执行以下操作:

    • 计算sandboxcontainer的变化。
    • 必要的时候杀死pod。
    • 杀死所有不需要运行的container
    • 必要时创建sandbox
    • 创建init container
    • 创建正常的container

    Runtime.SyncPod部分代码位于pkg/kubelet/kuberuntime/kuberuntime_manager.go

    3.1. computePodActions

    计算sandboxcontainer的变化。

    1. // Step 1: Compute sandbox and container changes.
    2. podContainerChanges := m.computePodActions(pod, podStatus)
    3. glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
    4. if podContainerChanges.CreateSandbox {
    5. ref, err := ref.GetReference(legacyscheme.Scheme, pod)
    6. if err != nil {
    7. glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
    8. }
    9. if podContainerChanges.SandboxID != "" {
    10. m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
    11. } else {
    12. glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
    13. }
    14. }

    3.2. killPodWithSyncResult

    必要的时候杀死pod。

    1. // Step 2: Kill the pod if the sandbox has changed.
    2. if podContainerChanges.KillPod {
    3. if !podContainerChanges.CreateSandbox {
    4. glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
    5. } else {
    6. glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
    7. }
    8. killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
    9. result.AddPodSyncResult(killResult)
    10. if killResult.Error() != nil {
    11. glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
    12. return
    13. }
    14. if podContainerChanges.CreateSandbox {
    15. m.purgeInitContainers(pod, podStatus)
    16. }
    17. }

    3.3. killContainer

    杀死所有不需要运行的container

    1. // Step 3: kill any running containers in this pod which are not to keep.
    2. for containerID, containerInfo := range podContainerChanges.ContainersToKill {
    3. glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
    4. killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
    5. result.AddSyncResult(killContainerResult)
    6. if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
    7. killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
    8. glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
    9. return
    10. }
    11. }

    3.4. createPodSandbox

    必要时创建sandbox

    1. // Step 4: Create a sandbox for the pod if necessary.
    2. ...
    3. glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
    4. createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
    5. result.AddSyncResult(createSandboxResult)
    6. podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
    7. if err != nil {
    8. createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
    9. glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
    10. ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
    11. if referr != nil {
    12. glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
    13. }
    14. m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox: %v", err)
    15. return
    16. }
    17. glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

    3.5. start init container

    创建init container

    1. // Step 5: start the init container.
    2. if container := podContainerChanges.NextInitContainerToStart; container != nil {
    3. // Start the next init container.
    4. startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
    5. result.AddSyncResult(startContainerResult)
    6. isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
    7. if isInBackOff {
    8. startContainerResult.Fail(err, msg)
    9. glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
    10. return
    11. }
    12. glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
    13. if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
    14. startContainerResult.Fail(err, msg)
    15. utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
    16. return
    17. }
    18. // Successfully started the container; clear the entry in the failure
    19. glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    20. }

    3.6. start containers

    创建正常的container

    1. // Step 6: start containers in podContainerChanges.ContainersToStart.
    2. for _, idx := range podContainerChanges.ContainersToStart {
    3. container := &pod.Spec.Containers[idx]
    4. startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
    5. result.AddSyncResult(startContainerResult)
    6. isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
    7. if isInBackOff {
    8. startContainerResult.Fail(err, msg)
    9. glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
    10. continue
    11. }
    12. glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
    13. // 通过startContainer来运行容器
    14. if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
    15. startContainerResult.Fail(err, msg)
    16. // known errors that are logged in other places are logged at higher levels here to avoid
    17. // repetitive log spam
    18. switch {
    19. case err == images.ErrImagePullBackOff:
    20. glog.V(3).Infof("container start failed: %v: %s", err, msg)
    21. default:
    22. utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
    23. }
    24. continue
    25. }
    26. }

    4. startContainer

    startContainer启动一个容器并返回是否成功。

    主要包括以下几个步骤:

    1. 拉取镜像
    2. 创建容器
    3. 启动容器
    4. 运行post start lifecycle hooks(如果有设置此项)

    startContainer完整代码如下:

    startContainer部分代码位于pkg/kubelet/kuberuntime/kuberuntime_container.go

    1. // startContainer starts a container and returns a message indicates why it is failed on error.
    2. // It starts the container through the following steps:
    3. // * pull the image
    4. // * create the container
    5. // * start the container
    6. // * run the post start lifecycle hooks (if applicable)
    7. func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
    8. // Step 1: pull the image.
    9. imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
    10. if err != nil {
    11. m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
    12. return msg, err
    13. }
    14. // Step 2: create the container.
    15. ref, err := kubecontainer.GenerateContainerRef(pod, container)
    16. if err != nil {
    17. glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
    18. }
    19. glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
    20. // For a new container, the RestartCount should be 0
    21. restartCount := 0
    22. containerStatus := podStatus.FindContainerStatusByName(container.Name)
    23. if containerStatus != nil {
    24. restartCount = containerStatus.RestartCount + 1
    25. }
    26. containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
    27. if cleanupAction != nil {
    28. defer cleanupAction()
    29. }
    30. if err != nil {
    31. m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
    32. return grpc.ErrorDesc(err), ErrCreateContainerConfig
    33. }
    34. containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    35. if err != nil {
    36. m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
    37. return grpc.ErrorDesc(err), ErrCreateContainer
    38. }
    39. err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    40. if err != nil {
    41. m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))
    42. return grpc.ErrorDesc(err), ErrPreStartHook
    43. }
    44. m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")
    45. if ref != nil {
    46. m.containerRefManager.SetRef(kubecontainer.ContainerID{
    47. Type: m.runtimeName,
    48. ID: containerID,
    49. }, ref)
    50. }
    51. // Step 3: start the container.
    52. err = m.runtimeService.StartContainer(containerID)
    53. if err != nil {
    54. m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
    55. return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
    56. }
    57. m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")
    58. // Symlink container logs to the legacy container log location for cluster logging
    59. // support.
    60. // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
    61. containerMeta := containerConfig.GetMetadata()
    62. sandboxMeta := podSandboxConfig.GetMetadata()
    63. legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
    64. sandboxMeta.Namespace)
    65. containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
    66. // only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
    67. // Because if containerLog path does not exist, only dandling legacySymlink is created.
    68. // This dangling legacySymlink is later removed by container gc, so it does not make sense
    69. // to create it in the first place. it happens when journald logging driver is used with docker.
    70. if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
    71. if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
    72. glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
    73. legacySymlink, containerID, containerLog, err)
    74. }
    75. }
    76. // Step 4: execute the post start hook.
    77. if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
    78. kubeContainerID := kubecontainer.ContainerID{
    79. Type: m.runtimeName,
    80. ID: containerID,
    81. }
    82. msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
    83. if handlerErr != nil {
    84. m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
    85. if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
    86. glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
    87. container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
    88. }
    89. return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
    90. }
    91. }
    92. return "", nil
    93. }

    以下对startContainer分段分析:

    4.1. pull image

    通过EnsureImageExists方法拉取拉取指定pod容器的镜像,并返回镜像信息和错误。

    1. // Step 1: pull the image.
    2. imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
    3. if err != nil {
    4. m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
    5. return msg, err
    6. }

    4.2. CreateContainer

    首先生成container的*v1.ObjectReference对象,该对象包括container的相关信息。

    1. // Step 2: create the container.
    2. ref, err := kubecontainer.GenerateContainerRef(pod, container)
    3. if err != nil {
    4. glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
    5. }
    6. glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)

    统计container的重启次数,新的容器默认重启次数为0。

    1. // For a new container, the RestartCount should be 0
    2. restartCount := 0
    3. containerStatus := podStatus.FindContainerStatusByName(container.Name)
    4. if containerStatus != nil {
    5. restartCount = containerStatus.RestartCount + 1
    6. }

    生成container的配置。

    1. containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
    2. if cleanupAction != nil {
    3. defer cleanupAction()
    4. }
    5. if err != nil {
    6. m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
    7. return grpc.ErrorDesc(err), ErrCreateContainerConfig
    8. }

    调用runtimeService,执行CreateContainer的操作。

    1. containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    2. if err != nil {
    3. m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
    4. return grpc.ErrorDesc(err), ErrCreateContainer
    5. }
    6. err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    7. if err != nil {
    8. m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))
    9. return grpc.ErrorDesc(err), ErrPreStartHook
    10. }
    11. m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")
    12. if ref != nil {
    13. m.containerRefManager.SetRef(kubecontainer.ContainerID{
    14. Type: m.runtimeName,
    15. ID: containerID,
    16. }, ref)
    17. }

    4.3. StartContainer

    执行runtimeServiceStartContainer方法,来启动容器。

    1. // Step 3: start the container.
    2. err = m.runtimeService.StartContainer(containerID)
    3. if err != nil {
    4. m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
    5. return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
    6. }
    7. m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")
    8. // Symlink container logs to the legacy container log location for cluster logging
    9. // support.
    10. // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
    11. containerMeta := containerConfig.GetMetadata()
    12. sandboxMeta := podSandboxConfig.GetMetadata()
    13. legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
    14. sandboxMeta.Namespace)
    15. containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
    16. // only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
    17. // Because if containerLog path does not exist, only dandling legacySymlink is created.
    18. // This dangling legacySymlink is later removed by container gc, so it does not make sense
    19. // to create it in the first place. it happens when journald logging driver is used with docker.
    20. if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
    21. if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
    22. glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
    23. legacySymlink, containerID, containerLog, err)
    24. }
    25. }

    4.4. execute post start hook

    如果有指定Lifecycle.PostStart,则执行PostStart操作,PostStart如果执行失败,则容器会根据重启的规则进行重启。

    1. // Step 4: execute the post start hook.
    2. if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
    3. kubeContainerID := kubecontainer.ContainerID{
    4. Type: m.runtimeName,
    5. ID: containerID,
    6. }
    7. msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
    8. if handlerErr != nil {
    9. m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
    10. if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
    11. glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
    12. container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
    13. }
    14. return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
    15. }
    16. }

    5. 总结

    kubelet的工作是管理pod在Node上的生命周期(包括增删改查),kubelet通过各种类型的manager异步工作各自执行各自的任务,其中使用到了多种的channel来控制状态信号变化的传递,例如比较重要的channel有podUpdates <-chan UpdatePodOptions,来传递pod的变化情况。

    创建pod的调用逻辑

    syncLoopIteration-->kubetypes.ADD-->HandlePodAdditions(u.Pods)-->dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)-->podWorkers.UpdatePod-->managePodLoop(podUpdates)-->syncPod(o syncPodOptions)-->containerRuntime.SyncPod-->startContainer

    参考:

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/pod_workers.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kuberuntime/kuberuntime_container.go