• kube-controller-manager源码分析(二)之 DeploymentController
  • 1. startDeploymentController
  • 2. NewDeploymentController
    • 2.1. eventBroadcaster
    • 2.2. rsControl
    • 2.3. Informer().AddEventHandler
    • 2.4. Informer.Lister()
    • 2.5. Informer().HasSynced
    • 2.6. syncHandler
  • 3. DeploymentController.Run
    • 3.1. WaitForCacheSync
    • 3.2. dc.worker
  • 4. syncDeployment
    • 4.1. Get deployment
    • 4.2. getReplicaSetsForDeployment
    • 4.3. getPodMapForDeployment
    • 4.4. checkPausedConditions
    • 4.5. isScalingEvent
    • 4.6. rolloutRecreate
    • 4.7. rolloutRolling
  • 5. 总结

    kube-controller-manager源码分析(二)之 DeploymentController

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要以deployment controller为例,分析该类controller的运行逻辑。此部分代码主要为位于pkg/controller/deploymentpkg/controller部分的代码包括了各种类型的controller的具体实现。

    controller managerpkg部分代码目录结构如下:

    1. controller # 主要包含各种controller的具体实现
    2. ├── apis
    3. ├── bootstrap
    4. ├── certificates
    5. ├── client_builder.go
    6. ├── cloud
    7. ├── clusterroleaggregation
    8. ├── controller_ref_manager.go
    9. ├── controller_utils.go # WaitForCacheSync
    10. ├── cronjob
    11. ├── daemon
    12. ├── deployment # deployment controller
    13. ├── deployment_controller.go # NewDeploymentController、Run、syncDeployment
    14. ├── progress.go # syncRolloutStatus
    15. ├── recreate.go # rolloutRecreate
    16. ├── rollback.go # rollback
    17. ├── rolling.go # rolloutRolling
    18. ├── sync.go
    19. ├── disruption # disruption controller
    20. ├── endpoint
    21. ├── garbagecollector
    22. ├── history
    23. ├── job
    24. ├── lookup_cache.go
    25. ├── namespace # namespace controller
    26. ├── nodeipam
    27. ├── nodelifecycle
    28. ├── podautoscaler
    29. ├── podgc
    30. ├── replicaset # replicaset controller
    31. ├── replication # replication controller
    32. ├── resourcequota
    33. ├── route
    34. ├── service # service controller
    35. ├── serviceaccount
    36. ├── statefulset # statefulset controller
    37. └── volume # PersistentVolumeController、AttachDetachController、PVCProtectionController

    1. startDeploymentController

    1. func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    2. if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
    3. return nil, false, nil
    4. }
    5. dc, err := deployment.NewDeploymentController(
    6. ctx.InformerFactory.Apps().V1().Deployments(),
    7. ctx.InformerFactory.Apps().V1().ReplicaSets(),
    8. ctx.InformerFactory.Core().V1().Pods(),
    9. ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    10. )
    11. if err != nil {
    12. return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    13. }
    14. go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    15. return nil, true, nil
    16. }

    startDeploymentController主要调用的函数为NewDeploymentController和对应的Run函数。该部分逻辑在kubernetes/pkg/controller中。

    2. NewDeploymentController

    NewDeploymentController主要构建DeploymentController结构体。

    该部分主要处理了以下逻辑:

    • 构建并运行事件处理器eventBroadcaster
    • 初始化赋值rsControlclientsetworkqueue
    • 添加dInformerrsInformerpodInformerResourceEventHandlerFuncs,其中主要为AddFuncUpdateFuncDeleteFunc三类方法。
    • 构造deployment、rs、pod的Informer的Lister函数和HasSynced函数。
    • 调用syncHandler,来实现syncDeployment

    2.1. eventBroadcaster

    调用事件处理器来记录deployment相关的事件。

    1. eventBroadcaster := record.NewBroadcaster()
    2. eventBroadcaster.StartLogging(glog.Infof)
    3. // TODO: remove the wrapper when every clients have moved to use the clientset.
    4. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})

    2.2. rsControl

    构造DeploymentController,包括clientsetworkqueuersControl。其中rsControl是具体实现rs逻辑的controller。

    1. dc := &DeploymentController{
    2. client: client,
    3. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
    4. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    5. }
    6. dc.rsControl = controller.RealRSControl{
    7. KubeClient: client,
    8. Recorder: dc.eventRecorder,
    9. }

    2.3. Informer().AddEventHandler

    添加dInformerrsInformerpodInformerResourceEventHandlerFuncs,其中主要为AddFuncUpdateFuncDeleteFunc三类方法。

    1. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    2. AddFunc: dc.addDeployment,
    3. UpdateFunc: dc.updateDeployment,
    4. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
    5. DeleteFunc: dc.deleteDeployment,
    6. })
    7. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    8. AddFunc: dc.addReplicaSet,
    9. UpdateFunc: dc.updateReplicaSet,
    10. DeleteFunc: dc.deleteReplicaSet,
    11. })
    12. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    13. DeleteFunc: dc.deletePod,
    14. })

    2.4. Informer.Lister()

    调用dInformerrsInformerpodInformerLister()方法。

    1. dc.dLister = dInformer.Lister()
    2. dc.rsLister = rsInformer.Lister()
    3. dc.podLister = podInformer.Lister()

    2.5. Informer().HasSynced

    调用Informer().HasSynced,判断是否缓存完成;

    1. dc.dListerSynced = dInformer.Informer().HasSynced
    2. dc.rsListerSynced = rsInformer.Informer().HasSynced
    3. dc.podListerSynced = podInformer.Informer().HasSynced

    2.6. syncHandler

    syncHandler具体为syncDeployment,syncHandler负责deployment的同步实现。

    1. dc.syncHandler = dc.syncDeployment
    2. dc.enqueueDeployment = dc.enqueue

    完整代码如下:

    1. // NewDeploymentController creates a new DeploymentController.
    2. func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    3. eventBroadcaster := record.NewBroadcaster()
    4. eventBroadcaster.StartLogging(glog.Infof)
    5. // TODO: remove the wrapper when every clients have moved to use the clientset.
    6. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
    7. if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
    8. if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
    9. return nil, err
    10. }
    11. }
    12. dc := &DeploymentController{
    13. client: client,
    14. eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
    15. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    16. }
    17. dc.rsControl = controller.RealRSControl{
    18. KubeClient: client,
    19. Recorder: dc.eventRecorder,
    20. }
    21. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    22. AddFunc: dc.addDeployment,
    23. UpdateFunc: dc.updateDeployment,
    24. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
    25. DeleteFunc: dc.deleteDeployment,
    26. })
    27. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    28. AddFunc: dc.addReplicaSet,
    29. UpdateFunc: dc.updateReplicaSet,
    30. DeleteFunc: dc.deleteReplicaSet,
    31. })
    32. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    33. DeleteFunc: dc.deletePod,
    34. })
    35. dc.syncHandler = dc.syncDeployment
    36. dc.enqueueDeployment = dc.enqueue
    37. dc.dLister = dInformer.Lister()
    38. dc.rsLister = rsInformer.Lister()
    39. dc.podLister = podInformer.Lister()
    40. dc.dListerSynced = dInformer.Informer().HasSynced
    41. dc.rsListerSynced = rsInformer.Informer().HasSynced
    42. dc.podListerSynced = podInformer.Informer().HasSynced
    43. return dc, nil
    44. }

    3. DeploymentController.Run

    Run执行watch和sync的操作。

    1. // Run begins watching and syncing.
    2. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
    3. defer utilruntime.HandleCrash()
    4. defer dc.queue.ShutDown()
    5. glog.Infof("Starting deployment controller")
    6. defer glog.Infof("Shutting down deployment controller")
    7. if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
    8. return
    9. }
    10. for i := 0; i < workers; i++ {
    11. go wait.Until(dc.worker, time.Second, stopCh)
    12. }
    13. <-stopCh
    14. }

    3.1. WaitForCacheSync

    WaitForCacheSync主要是用来在List-Watch机制中可以保持当前cache的数据与etcd的数据一致。

    1. // WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
    2. // indicating that the controller identified by controllerName is waiting for syncs, followed by
    3. // either a successful or failed sync.
    4. func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
    5. glog.Infof("Waiting for caches to sync for %s controller", controllerName)
    6. if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
    7. utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
    8. return false
    9. }
    10. glog.Infof("Caches are synced for %s controller", controllerName)
    11. return true
    12. }

    3.2. dc.worker

    worker调用了processNextWorkItemprocessNextWorkItem最终调用了syncHandler,而syncHandlerNewDeploymentController中赋值的具体函数为syncDeployment

    1. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
    2. // It enforces that the syncHandler is never invoked concurrently with the same key.
    3. func (dc *DeploymentController) worker() {
    4. for dc.processNextWorkItem() {
    5. }
    6. }
    7. func (dc *DeploymentController) processNextWorkItem() bool {
    8. key, quit := dc.queue.Get()
    9. if quit {
    10. return false
    11. }
    12. defer dc.queue.Done(key)
    13. err := dc.syncHandler(key.(string))
    14. dc.handleErr(err, key)
    15. return true
    16. }

    NewDeploymentController中的syncHandler赋值:

    1. func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    2. ...
    3. dc.syncHandler = dc.syncDeployment
    4. ...
    5. }

    4. syncDeployment

    syncDeployment基于给定的key执行sync deployment的操作。

    主要流程如下:

    1. 通过SplitMetaNamespaceKey获取namespace和deployment对象的name。
    2. 调用Lister的接口获取的deployment的对象。
    3. getReplicaSetsForDeployment获取deployment管理的ReplicaSet对象。
    4. getPodMapForDeployment获取deployment管理的pod,基于ReplicaSet来分组。
    5. checkPausedConditions检查deployment是否是pause状态并添加合适的condition
    6. isScalingEvent检查deployment的更新是否来自于一个scale的事件,如果是则执行scale的操作。
    7. 根据DeploymentStrategyType类型执行rolloutRecreaterolloutRolling

    完整代码如下:

    1. // syncDeployment will sync the deployment with the given key.
    2. // This function is not meant to be invoked concurrently with the same key.
    3. func (dc *DeploymentController) syncDeployment(key string) error {
    4. startTime := time.Now()
    5. glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
    6. defer func() {
    7. glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
    8. }()
    9. namespace, name, err := cache.SplitMetaNamespaceKey(key)
    10. if err != nil {
    11. return err
    12. }
    13. deployment, err := dc.dLister.Deployments(namespace).Get(name)
    14. if errors.IsNotFound(err) {
    15. glog.V(2).Infof("Deployment %v has been deleted", key)
    16. return nil
    17. }
    18. if err != nil {
    19. return err
    20. }
    21. // Deep-copy otherwise we are mutating our cache.
    22. // TODO: Deep-copy only when needed.
    23. d := deployment.DeepCopy()
    24. everything := metav1.LabelSelector{}
    25. if reflect.DeepEqual(d.Spec.Selector, &everything) {
    26. dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
    27. if d.Status.ObservedGeneration < d.Generation {
    28. d.Status.ObservedGeneration = d.Generation
    29. dc.client.ExtensionsV1beta1().Deployments(d.Namespace).UpdateStatus(d)
    30. }
    31. return nil
    32. }
    33. // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
    34. // through adoption/orphaning.
    35. rsList, err := dc.getReplicaSetsForDeployment(d)
    36. if err != nil {
    37. return err
    38. }
    39. // List all Pods owned by this Deployment, grouped by their ReplicaSet.
    40. // Current uses of the podMap are:
    41. //
    42. // * check if a Pod is labeled correctly with the pod-template-hash label.
    43. // * check that no old Pods are running in the middle of Recreate Deployments.
    44. podMap, err := dc.getPodMapForDeployment(d, rsList)
    45. if err != nil {
    46. return err
    47. }
    48. if d.DeletionTimestamp != nil {
    49. return dc.syncStatusOnly(d, rsList, podMap)
    50. }
    51. // Update deployment conditions with an Unknown condition when pausing/resuming
    52. // a deployment. In this way, we can be sure that we won't timeout when a user
    53. // resumes a Deployment with a set progressDeadlineSeconds.
    54. if err = dc.checkPausedConditions(d); err != nil {
    55. return err
    56. }
    57. if d.Spec.Paused {
    58. return dc.sync(d, rsList, podMap)
    59. }
    60. // rollback is not re-entrant in case the underlying replica sets are updated with a new
    61. // revision so we should ensure that we won't proceed to update replica sets until we
    62. // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
    63. if d.Spec.RollbackTo != nil {
    64. return dc.rollback(d, rsList, podMap)
    65. }
    66. scalingEvent, err := dc.isScalingEvent(d, rsList, podMap)
    67. if err != nil {
    68. return err
    69. }
    70. if scalingEvent {
    71. return dc.sync(d, rsList, podMap)
    72. }
    73. switch d.Spec.Strategy.Type {
    74. case extensions.RecreateDeploymentStrategyType:
    75. return dc.rolloutRecreate(d, rsList, podMap)
    76. case extensions.RollingUpdateDeploymentStrategyType:
    77. return dc.rolloutRolling(d, rsList, podMap)
    78. }
    79. return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
    80. }

    4.1. Get deployment

    1. // get namespace and deployment name
    2. namespace, name, err := cache.SplitMetaNamespaceKey(key)
    3. // get deployment by name
    4. deployment, err := dc.dLister.Deployments(namespace).Get(name)

    4.2. getReplicaSetsForDeployment

    1. // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
    2. // through adoption/orphaning.
    3. rsList, err := dc.getReplicaSetsForDeployment(d)

    getReplicaSetsForDeployment具体代码:

    1. // getReplicaSetsForDeployment uses ControllerRefManager to reconcile
    2. // ControllerRef by adopting and orphaning.
    3. // It returns the list of ReplicaSets that this Deployment should manage.
    4. func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) ([]*apps.ReplicaSet, error) {
    5. // List all ReplicaSets to find those we own but that no longer match our
    6. // selector. They will be orphaned by ClaimReplicaSets().
    7. rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
    8. if err != nil {
    9. return nil, err
    10. }
    11. deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
    12. if err != nil {
    13. return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
    14. }
    15. // If any adoptions are attempted, we should first recheck for deletion with
    16. // an uncached quorum read sometime after listing ReplicaSets (see #42639).
    17. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
    18. fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})
    19. if err != nil {
    20. return nil, err
    21. }
    22. if fresh.UID != d.UID {
    23. return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
    24. }
    25. return fresh, nil
    26. })
    27. cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
    28. return cm.ClaimReplicaSets(rsList)
    29. }

    4.3. getPodMapForDeployment

    1. // List all Pods owned by this Deployment, grouped by their ReplicaSet.
    2. // Current uses of the podMap are:
    3. //
    4. // * check if a Pod is labeled correctly with the pod-template-hash label.
    5. // * check that no old Pods are running in the middle of Recreate Deployments.
    6. podMap, err := dc.getPodMapForDeployment(d, rsList)

    getPodMapForDeployment具体代码:

    1. // getPodMapForDeployment returns the Pods managed by a Deployment.
    2. //
    3. // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
    4. // according to the Pod's ControllerRef.
    5. func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID]*v1.PodList, error) {
    6. // Get all Pods that potentially belong to this Deployment.
    7. selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
    8. if err != nil {
    9. return nil, err
    10. }
    11. pods, err := dc.podLister.Pods(d.Namespace).List(selector)
    12. if err != nil {
    13. return nil, err
    14. }
    15. // Group Pods by their controller (if it's in rsList).
    16. podMap := make(map[types.UID]*v1.PodList, len(rsList))
    17. for _, rs := range rsList {
    18. podMap[rs.UID] = &v1.PodList{}
    19. }
    20. for _, pod := range pods {
    21. // Do not ignore inactive Pods because Recreate Deployments need to verify that no
    22. // Pods from older versions are running before spinning up new Pods.
    23. controllerRef := metav1.GetControllerOf(pod)
    24. if controllerRef == nil {
    25. continue
    26. }
    27. // Only append if we care about this UID.
    28. if podList, ok := podMap[controllerRef.UID]; ok {
    29. podList.Items = append(podList.Items, *pod)
    30. }
    31. }
    32. return podMap, nil
    33. }

    4.4. checkPausedConditions

    1. // Update deployment conditions with an Unknown condition when pausing/resuming
    2. // a deployment. In this way, we can be sure that we won't timeout when a user
    3. // resumes a Deployment with a set progressDeadlineSeconds.
    4. if err = dc.checkPausedConditions(d); err != nil {
    5. return err
    6. }
    7. if d.Spec.Paused {
    8. return dc.sync(d, rsList)
    9. }

    checkPausedConditions具体代码:

    1. // checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
    2. // These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
    3. // that were paused for longer than progressDeadlineSeconds.
    4. func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {
    5. if !deploymentutil.HasProgressDeadline(d) {
    6. return nil
    7. }
    8. cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
    9. if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
    10. // If we have reported lack of progress, do not overwrite it with a paused condition.
    11. return nil
    12. }
    13. pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
    14. needsUpdate := false
    15. if d.Spec.Paused && !pausedCondExists {
    16. condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
    17. deploymentutil.SetDeploymentCondition(&d.Status, *condition)
    18. needsUpdate = true
    19. } else if !d.Spec.Paused && pausedCondExists {
    20. condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
    21. deploymentutil.SetDeploymentCondition(&d.Status, *condition)
    22. needsUpdate = true
    23. }
    24. if !needsUpdate {
    25. return nil
    26. }
    27. var err error
    28. d, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
    29. return err
    30. }

    4.5. isScalingEvent

    1. scalingEvent, err := dc.isScalingEvent(d, rsList)
    2. if err != nil {
    3. return err
    4. }
    5. if scalingEvent {
    6. return dc.sync(d, rsList)
    7. }

    isScalingEvent具体代码:

    1. // isScalingEvent checks whether the provided deployment has been updated with a scaling event
    2. // by looking at the desired-replicas annotation in the active replica sets of the deployment.
    3. //
    4. // rsList should come from getReplicaSetsForDeployment(d).
    5. // podMap should come from getPodMapForDeployment(d, rsList).
    6. func (dc *DeploymentController) isScalingEvent(d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
    7. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    8. if err != nil {
    9. return false, err
    10. }
    11. allRSs := append(oldRSs, newRS)
    12. for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
    13. desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
    14. if !ok {
    15. continue
    16. }
    17. if desired != *(d.Spec.Replicas) {
    18. return true, nil
    19. }
    20. }
    21. return false, nil
    22. }

    4.6. rolloutRecreate

    1. switch d.Spec.Strategy.Type {
    2. case apps.RecreateDeploymentStrategyType:
    3. return dc.rolloutRecreate(d, rsList, podMap)

    rolloutRecreate具体代码:

    1. // rolloutRecreate implements the logic for recreating a replica set.
    2. func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
    3. // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
    4. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    5. if err != nil {
    6. return err
    7. }
    8. allRSs := append(oldRSs, newRS)
    9. activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
    10. // scale down old replica sets.
    11. scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
    12. if err != nil {
    13. return err
    14. }
    15. if scaledDown {
    16. // Update DeploymentStatus.
    17. return dc.syncRolloutStatus(allRSs, newRS, d)
    18. }
    19. // Do not process a deployment when it has old pods running.
    20. if oldPodsRunning(newRS, oldRSs, podMap) {
    21. return dc.syncRolloutStatus(allRSs, newRS, d)
    22. }
    23. // If we need to create a new RS, create it now.
    24. if newRS == nil {
    25. newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
    26. if err != nil {
    27. return err
    28. }
    29. allRSs = append(oldRSs, newRS)
    30. }
    31. // scale up new replica set.
    32. if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
    33. return err
    34. }
    35. if util.DeploymentComplete(d, &d.Status) {
    36. if err := dc.cleanupDeployment(oldRSs, d); err != nil {
    37. return err
    38. }
    39. }
    40. // Sync deployment status.
    41. return dc.syncRolloutStatus(allRSs, newRS, d)
    42. }

    4.7. rolloutRolling

    1. switch d.Spec.Strategy.Type {
    2. case apps.RecreateDeploymentStrategyType:
    3. return dc.rolloutRecreate(d, rsList, podMap)
    4. case apps.RollingUpdateDeploymentStrategyType:
    5. return dc.rolloutRolling(d, rsList)
    6. }

    rolloutRolling具体代码:

    1. // rolloutRolling implements the logic for rolling a new replica set.
    2. func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    3. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
    4. if err != nil {
    5. return err
    6. }
    7. allRSs := append(oldRSs, newRS)
    8. // Scale up, if we can.
    9. scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
    10. if err != nil {
    11. return err
    12. }
    13. if scaledUp {
    14. // Update DeploymentStatus
    15. return dc.syncRolloutStatus(allRSs, newRS, d)
    16. }
    17. // Scale down, if we can.
    18. scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
    19. if err != nil {
    20. return err
    21. }
    22. if scaledDown {
    23. // Update DeploymentStatus
    24. return dc.syncRolloutStatus(allRSs, newRS, d)
    25. }
    26. if deploymentutil.DeploymentComplete(d, &d.Status) {
    27. if err := dc.cleanupDeployment(oldRSs, d); err != nil {
    28. return err
    29. }
    30. }
    31. // Sync deployment status
    32. return dc.syncRolloutStatus(allRSs, newRS, d)
    33. }

    5. 总结

    startDeploymentController主要包括NewDeploymentControllerDeploymentController.Run两部分。

    NewDeploymentController主要构建DeploymentController结构体。

    该部分主要处理了以下逻辑:

    1. 构建并运行事件处理器eventBroadcaster
    2. 初始化赋值rsControlclientsetworkqueue
    3. 添加dInformerrsInformerpodInformerResourceEventHandlerFuncs,其中主要为AddFuncUpdateFuncDeleteFunc三类方法。
    4. 构造deployment、rs、pod的Informer的Lister函数和HasSynced函数。
    5. 赋值syncHandler,来实现syncDeployment

    DeploymentController.Run主要包含WaitForCacheSyncsyncDeployment两部分。

    syncDeployment基于给定的key执行sync deployment的操作。

    主要流程如下:

    1. 通过SplitMetaNamespaceKey获取namespace和deployment对象的name。
    2. 调用Lister的接口获取的deployment的对象。
    3. getReplicaSetsForDeployment获取deployment管理的ReplicaSet对象。
    4. getPodMapForDeployment获取deployment管理的pod,基于ReplicaSet来分组。
    5. checkPausedConditions检查deployment是否是pause状态并添加合适的condition
    6. isScalingEvent检查deployment的更新是否来自于一个scale的事件,如果是则执行scale的操作。
    7. 根据DeploymentStrategyType类型执行rolloutRecreaterolloutRolling

    参考:

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/controller/deployment/deployment_controller.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/controller/deployment/rolling.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-controller-manager/app/apps.go