• kube-scheduler源码分析(五)之 PrioritizeNodes
  • 1. 调用入口
  • 2. PrioritizeNodes
  • 3. EqualPriorityMap
  • 4. processNode
  • 5. PriorityMapFunction
  • 6. PriorityReduceFunction
  • 7. Summarize all scores
  • 8. NewSelectorSpreadPriority
    • 8.1. CalculateSpreadPriorityMap
    • 8.2. CalculateSpreadPriorityReduce
  • 9. 总结
    • 9.1. PrioritizeNodes
    • 9.2. NewSelectorSpreadPriority
      • 9.2.1. CalculateSpreadPriorityMap
      • 9.2.2. CalculateSpreadPriorityReduce

    kube-scheduler源码分析(五)之 PrioritizeNodes

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析优选策略逻辑,即从预选的节点中选择出最优的节点。优选策略的具体实现函数为PrioritizeNodesPrioritizeNodes最终返回是一个记录了各个节点分数的列表。

    1. 调用入口

    genericScheduler.Schedule中对PrioritizeNodes的调用过程如下:

    此部分代码位于pkg/scheduler/core/generic_scheduler.go

    1. func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    2. ...
    3. trace.Step("Prioritizing")
    4. startPriorityEvalTime := time.Now()
    5. // When only one node after predicate, just use it.
    6. if len(filteredNodes) == 1 {
    7. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    8. return filteredNodes[0].Name, nil
    9. }
    10. metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    11. // 执行优选逻辑的操作,返回记录各个节点分数的列表
    12. priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    13. if err != nil {
    14. return "", err
    15. }
    16. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    17. metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
    18. ...
    19. }

    核心代码:

    1. // 基于预选节点filteredNodes进一步筛选优选的节点,返回记录各个节点分数的列表
    2. priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)

    2. PrioritizeNodes

    优选,从满足的节点中选择出最优的节点。PrioritizeNodes最终返回是一个记录了各个节点分数的列表。

    具体操作如下:

    • PrioritizeNodes通过并行运行各个优先级函数来对节点进行优先级排序。
    • 每个优先级函数会给节点打分,打分范围为0-10分。
    • 0 表示优先级最低的节点,10表示优先级最高的节点。
    • 每个优先级函数也有各自的权重。
    • 优先级函数返回的节点分数乘以权重以获得加权分数。
    • 最后组合(添加)所有分数以获得所有节点的总加权分数。

    PrioritizeNodes主要流程如下:

    1. 如果没有设置优选函数和拓展函数,则全部节点设置相同的分数,直接返回。
    2. 依次给node执行map函数进行打分。
    3. 再对上述map函数的执行结果执行reduce函数计算最终得分。
    4. 最后根据不同优先级函数的权重对得分取加权平均数。

    入参:

    • pod
    • nodeNameToInfo
    • meta interface{},
    • priorityConfigs
    • nodes
    • extenders

    出参:

    • HostPriorityList:记录节点分数的列表。

    HostPriority定义如下:

    1. // HostPriority represents the priority of scheduling to a particular host, higher priority is better.
    2. type HostPriority struct {
    3. // Name of the host
    4. Host string
    5. // Score associated with the host
    6. Score int
    7. }

    PrioritizeNodes完整代码如下:

    此部分代码位于pkg/scheduler/core/generic_scheduler.go

    1. // PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
    2. // Each priority function is expected to set a score of 0-10
    3. // 0 is the lowest priority score (least preferred node) and 10 is the highest
    4. // Each priority function can also have its own weight
    5. // The node scores returned by the priority function are multiplied by the weights to get weighted scores
    6. // All scores are finally combined (added) to get the total weighted scores of all nodes
    7. func PrioritizeNodes(
    8. pod *v1.Pod,
    9. nodeNameToInfo map[string]*schedulercache.NodeInfo,
    10. meta interface{},
    11. priorityConfigs []algorithm.PriorityConfig,
    12. nodes []*v1.Node,
    13. extenders []algorithm.SchedulerExtender,
    14. ) (schedulerapi.HostPriorityList, error) {
    15. // If no priority configs are provided, then the EqualPriority function is applied
    16. // This is required to generate the priority list in the required format
    17. if len(priorityConfigs) == 0 && len(extenders) == 0 {
    18. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    19. for i := range nodes {
    20. hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
    21. if err != nil {
    22. return nil, err
    23. }
    24. result = append(result, hostPriority)
    25. }
    26. return result, nil
    27. }
    28. var (
    29. mu = sync.Mutex{}
    30. wg = sync.WaitGroup{}
    31. errs []error
    32. )
    33. appendError := func(err error) {
    34. mu.Lock()
    35. defer mu.Unlock()
    36. errs = append(errs, err)
    37. }
    38. results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
    39. for i, priorityConfig := range priorityConfigs {
    40. if priorityConfig.Function != nil {
    41. // DEPRECATED
    42. wg.Add(1)
    43. go func(index int, config algorithm.PriorityConfig) {
    44. defer wg.Done()
    45. var err error
    46. results[index], err = config.Function(pod, nodeNameToInfo, nodes)
    47. if err != nil {
    48. appendError(err)
    49. }
    50. }(i, priorityConfig)
    51. } else {
    52. results[i] = make(schedulerapi.HostPriorityList, len(nodes))
    53. }
    54. }
    55. processNode := func(index int) {
    56. nodeInfo := nodeNameToInfo[nodes[index].Name]
    57. var err error
    58. for i := range priorityConfigs {
    59. if priorityConfigs[i].Function != nil {
    60. continue
    61. }
    62. results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
    63. if err != nil {
    64. appendError(err)
    65. return
    66. }
    67. }
    68. }
    69. workqueue.Parallelize(16, len(nodes), processNode)
    70. for i, priorityConfig := range priorityConfigs {
    71. if priorityConfig.Reduce == nil {
    72. continue
    73. }
    74. wg.Add(1)
    75. go func(index int, config algorithm.PriorityConfig) {
    76. defer wg.Done()
    77. if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
    78. appendError(err)
    79. }
    80. if glog.V(10) {
    81. for _, hostPriority := range results[index] {
    82. glog.Infof("%v -> %v: %v, Score: (%d)", pod.Name, hostPriority.Host, config.Name, hostPriority.Score)
    83. }
    84. }
    85. }(i, priorityConfig)
    86. }
    87. // Wait for all computations to be finished.
    88. wg.Wait()
    89. if len(errs) != 0 {
    90. return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    91. }
    92. // Summarize all scores.
    93. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    94. for i := range nodes {
    95. result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
    96. for j := range priorityConfigs {
    97. result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
    98. }
    99. }
    100. if len(extenders) != 0 && nodes != nil {
    101. combinedScores := make(map[string]int, len(nodeNameToInfo))
    102. for _, extender := range extenders {
    103. if !extender.IsInterested(pod) {
    104. continue
    105. }
    106. wg.Add(1)
    107. go func(ext algorithm.SchedulerExtender) {
    108. defer wg.Done()
    109. prioritizedList, weight, err := ext.Prioritize(pod, nodes)
    110. if err != nil {
    111. // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
    112. return
    113. }
    114. mu.Lock()
    115. for i := range *prioritizedList {
    116. host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
    117. combinedScores[host] += score * weight
    118. }
    119. mu.Unlock()
    120. }(extender)
    121. }
    122. // wait for all go routines to finish
    123. wg.Wait()
    124. for i := range result {
    125. result[i].Score += combinedScores[result[i].Host]
    126. }
    127. }
    128. if glog.V(10) {
    129. for i := range result {
    130. glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score)
    131. }
    132. }
    133. return result, nil
    134. }

    以下对PrioritizeNodes分段进行分析。

    3. EqualPriorityMap

    如果没有提供优选函数和拓展函数,则将所有的节点设置为相同的优先级,即节点的score都为1,然后直接返回结果。(但一般情况下优选函数列表都不为空)

    1. // If no priority configs are provided, then the EqualPriority function is applied
    2. // This is required to generate the priority list in the required format
    3. if len(priorityConfigs) == 0 && len(extenders) == 0 {
    4. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    5. for i := range nodes {
    6. hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
    7. if err != nil {
    8. return nil, err
    9. }
    10. result = append(result, hostPriority)
    11. }
    12. return result, nil
    13. }

    EqualPriorityMap具体实现如下:

    1. // EqualPriorityMap is a prioritizer function that gives an equal weight of one to all nodes
    2. func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
    3. node := nodeInfo.Node()
    4. if node == nil {
    5. return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
    6. }
    7. return schedulerapi.HostPriority{
    8. Host: node.Name,
    9. Score: 1,
    10. }, nil
    11. }

    4. processNode

    processNode就是基于index拿出node的信息,调用之前注册的各种优选函数(此处是mapFunction),通过优选函数对node和pod进行处理,最后返回一个记录node分数的列表resultprocessNode同样也使用workqueue.Parallelize来进行并行处理。(processNode类似于预选逻辑findNodesThatFit中使用到的checkNode的作用)

    其中优选函数是通过priorityConfigs来记录,每类优选函数包括PriorityMapFunctionPriorityReduceFunction两种函数。优选函数的注册部分可参考registerAlgorithmProvider。

    1. processNode := func(index int) {
    2. nodeInfo := nodeNameToInfo[nodes[index].Name]
    3. var err error
    4. for i := range priorityConfigs {
    5. if priorityConfigs[i].Function != nil {
    6. continue
    7. }
    8. results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
    9. if err != nil {
    10. appendError(err)
    11. return
    12. }
    13. }
    14. }
    15. // 并行执行processNode
    16. workqueue.Parallelize(16, len(nodes), processNode)

    priorityConfigs定义如下:

    核心属性:

    • Map :PriorityMapFunction
    • Reduce:PriorityReduceFunction
    1. // PriorityConfig is a config used for a priority function.
    2. type PriorityConfig struct {
    3. Name string
    4. Map PriorityMapFunction
    5. Reduce PriorityReduceFunction
    6. // TODO: Remove it after migrating all functions to
    7. // Map-Reduce pattern.
    8. Function PriorityFunction
    9. Weight int
    10. }

    具体的优选函数处理逻辑待下文分析,本文会以NewSelectorSpreadPriority函数为例。

    5. PriorityMapFunction

    PriorityMapFunction是一个计算给定节点的每个节点结果的函数。

    PriorityMapFunction定义如下:

    1. // PriorityMapFunction is a function that computes per-node results for a given node.
    2. // TODO: Figure out the exact API of this method.
    3. // TODO: Change interface{} to a specific type.
    4. type PriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error)

    PriorityMapFunction是在processNode中调用的,代码如下:

    1. results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)

    下文会分析NewSelectorSpreadPriority在的map函数CalculateSpreadPriorityMap

    6. PriorityReduceFunction

    PriorityReduceFunction是一个聚合每个节点结果并计算所有节点的最终得分的函数。

    PriorityReduceFunction定义如下:

    1. // PriorityReduceFunction is a function that aggregated per-node results and computes
    2. // final scores for all nodes.
    3. // TODO: Figure out the exact API of this method.
    4. // TODO: Change interface{} to a specific type.
    5. type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error

    PrioritizeNodes中对reduce函数调用部分如下:

    1. for i, priorityConfig := range priorityConfigs {
    2. if priorityConfig.Reduce == nil {
    3. continue
    4. }
    5. wg.Add(1)
    6. go func(index int, config algorithm.PriorityConfig) {
    7. defer wg.Done()
    8. if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
    9. appendError(err)
    10. }
    11. if glog.V(10) {
    12. for _, hostPriority := range results[index] {
    13. glog.Infof("%v -> %v: %v, Score: (%d)", pod.Name, hostPriority.Host, config.Name, hostPriority.Score)
    14. }
    15. }
    16. }(i, priorityConfig)
    17. }

    下文会分析NewSelectorSpreadPriority在的reduce函数CalculateSpreadPriorityReduce

    7. Summarize all scores

    先等待计算完成再计算加权平均数。

    1. // Wait for all computations to be finished.
    2. wg.Wait()
    3. if len(errs) != 0 {
    4. return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    5. }

    计算所有节点的加权平均数。

    1. // Summarize all scores.
    2. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    3. for i := range nodes {
    4. result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
    5. for j := range priorityConfigs {
    6. result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
    7. }
    8. }

    当设置了拓展的计算方式,则增加拓展计算方式的加权平均数。

    1. if len(extenders) != 0 && nodes != nil {
    2. combinedScores := make(map[string]int, len(nodeNameToInfo))
    3. for _, extender := range extenders {
    4. if !extender.IsInterested(pod) {
    5. continue
    6. }
    7. wg.Add(1)
    8. go func(ext algorithm.SchedulerExtender) {
    9. defer wg.Done()
    10. prioritizedList, weight, err := ext.Prioritize(pod, nodes)
    11. if err != nil {
    12. // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
    13. return
    14. }
    15. mu.Lock()
    16. for i := range *prioritizedList {
    17. host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
    18. combinedScores[host] += score * weight
    19. }
    20. mu.Unlock()
    21. }(extender)
    22. }
    23. // wait for all go routines to finish
    24. wg.Wait()
    25. for i := range result {
    26. result[i].Score += combinedScores[result[i].Host]
    27. }
    28. }

    8. NewSelectorSpreadPriority

    以下以NewSelectorSpreadPriority这个优选函数来做分析,其他重要的优选函数待后续专门分析。

    NewSelectorSpreadPriority主要的功能是将属于相同service和rs下的pod尽量分布在不同的node上。

    该函数的注册代码如下:

    此部分代码位于pkg/scheduler/algorithmprovider/defaults/defaults.go

    1. // ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
    2. // the number of pods (belonging to the same service) on the same node.
    3. // Register the factory so that it's available, but do not include it as part of the default priorities
    4. // Largely replaced by "SelectorSpreadPriority", but registered for backward compatibility with 1.0
    5. factory.RegisterPriorityConfigFactory(
    6. "ServiceSpreadingPriority",
    7. factory.PriorityConfigFactory{
    8. MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
    9. return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
    10. },
    11. Weight: 1,
    12. },
    13. )

    NewSelectorSpreadPriority的具体实现如下:

    此部分代码位于pkg/scheduler/algorithm/priorities/selector_spreading.go

    1. // NewSelectorSpreadPriority creates a SelectorSpread.
    2. func NewSelectorSpreadPriority(
    3. serviceLister algorithm.ServiceLister,
    4. controllerLister algorithm.ControllerLister,
    5. replicaSetLister algorithm.ReplicaSetLister,
    6. statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
    7. selectorSpread := &SelectorSpread{
    8. serviceLister: serviceLister,
    9. controllerLister: controllerLister,
    10. replicaSetLister: replicaSetLister,
    11. statefulSetLister: statefulSetLister,
    12. }
    13. return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce
    14. }

    NewSelectorSpreadPriority主要包括map和reduce两种函数,分别对应CalculateSpreadPriorityMapCalculateSpreadPriorityReduce

    8.1. CalculateSpreadPriorityMap

    CalculateSpreadPriorityMap的主要作用是将相同service、RC、RS或statefulset的pod分布在不同的节点上。当调度一个pod的时候,先寻找与该pod匹配的service、RS、RC或statefulset,然后寻找与其selector匹配的已存在的pod,寻找存在这类pod最少的节点。

    基本流程如下:

    1. 寻找与该pod对应的service、RS、RC、statefulset匹配的selector。
    2. 遍历当前节点的所有pod,将该节点上已存在的selector匹配到的pod的个数作为该节点的分数(此时,分数大的表示匹配到的pod越多,越不符合被调度的条件,该分数在reduce阶段会被按10分制处理成分数大的越符合被调度的条件)。

    此部分代码位于pkg/scheduler/algorithm/priorities/selector_spreading.go

    1. // CalculateSpreadPriorityMap spreads pods across hosts, considering pods
    2. // belonging to the same service,RC,RS or StatefulSet.
    3. // When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod,
    4. // then finds existing pods that match those selectors.
    5. // It favors nodes that have fewer existing matching pods.
    6. // i.e. it pushes the scheduler towards a node where there's the smallest number of
    7. // pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled.
    8. func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
    9. var selectors []labels.Selector
    10. node := nodeInfo.Node()
    11. if node == nil {
    12. return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
    13. }
    14. priorityMeta, ok := meta.(*priorityMetadata)
    15. if ok {
    16. selectors = priorityMeta.podSelectors
    17. } else {
    18. selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
    19. }
    20. if len(selectors) == 0 {
    21. return schedulerapi.HostPriority{
    22. Host: node.Name,
    23. Score: int(0),
    24. }, nil
    25. }
    26. count := int(0)
    27. for _, nodePod := range nodeInfo.Pods() {
    28. if pod.Namespace != nodePod.Namespace {
    29. continue
    30. }
    31. // When we are replacing a failed pod, we often see the previous
    32. // deleted version while scheduling the replacement.
    33. // Ignore the previous deleted version for spreading purposes
    34. // (it can still be considered for resource restrictions etc.)
    35. if nodePod.DeletionTimestamp != nil {
    36. glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
    37. continue
    38. }
    39. for _, selector := range selectors {
    40. if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
    41. count++
    42. break
    43. }
    44. }
    45. }
    46. return schedulerapi.HostPriority{
    47. Host: node.Name,
    48. Score: int(count),
    49. }, nil
    50. }

    以下分段分析:

    先获得selector。

    1. selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)

    计算节点上匹配selector的pod的个数,作为该节点分数,该分数并不是最终节点的分数,只是中间过渡的记录状态。

    1. count := int(0)
    2. for _, nodePod := range nodeInfo.Pods() {
    3. ...
    4. for _, selector := range selectors {
    5. if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
    6. count++
    7. break
    8. }
    9. }
    10. }

    8.2. CalculateSpreadPriorityReduce

    CalculateSpreadPriorityReduce根据节点上现有匹配pod的数量计算每个节点的十分制的分数,具有较少现有匹配pod的节点的分数越高,表示节点越可能被调度到。

    基本流程如下:

    1. 记录所有节点中匹配到pod个数最多的节点的分数(即匹配到的pod最多的个数)。
    2. 遍历所有的节点,按比例取十分制的得分,计算方式为:(节点中最多匹配pod的个数-当前节点pod的个数)/节点中最多匹配pod的个数。此时,分数越高表示该节点上匹配到的pod的个数越少,越可能被调度到,即满足把相同selector的pod分散到不同节点的需求。

    此部分代码位于pkg/scheduler/algorithm/priorities/selector_spreading.go

    1. // CalculateSpreadPriorityReduce calculates the source of each node
    2. // based on the number of existing matching pods on the node
    3. // where zone information is included on the nodes, it favors nodes
    4. // in zones with fewer existing matching pods.
    5. func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
    6. countsByZone := make(map[string]int, 10)
    7. maxCountByZone := int(0)
    8. maxCountByNodeName := int(0)
    9. for i := range result {
    10. if result[i].Score > maxCountByNodeName {
    11. maxCountByNodeName = result[i].Score
    12. }
    13. zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
    14. if zoneID == "" {
    15. continue
    16. }
    17. countsByZone[zoneID] += result[i].Score
    18. }
    19. for zoneID := range countsByZone {
    20. if countsByZone[zoneID] > maxCountByZone {
    21. maxCountByZone = countsByZone[zoneID]
    22. }
    23. }
    24. haveZones := len(countsByZone) != 0
    25. maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
    26. maxCountByZoneFloat64 := float64(maxCountByZone)
    27. MaxPriorityFloat64 := float64(schedulerapi.MaxPriority)
    28. for i := range result {
    29. // initializing to the default/max node score of maxPriority
    30. fScore := MaxPriorityFloat64
    31. if maxCountByNodeName > 0 {
    32. fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
    33. }
    34. // If there is zone information present, incorporate it
    35. if haveZones {
    36. zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
    37. if zoneID != "" {
    38. zoneScore := MaxPriorityFloat64
    39. if maxCountByZone > 0 {
    40. zoneScore = MaxPriorityFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
    41. }
    42. fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
    43. }
    44. }
    45. result[i].Score = int(fScore)
    46. if glog.V(10) {
    47. glog.Infof(
    48. "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore),
    49. )
    50. }
    51. }
    52. return nil
    53. }

    以下分段分析:

    先获取所有节点中匹配到的pod最多的个数。

    1. for i := range result {
    2. if result[i].Score > maxCountByNodeName {
    3. maxCountByNodeName = result[i].Score
    4. }
    5. zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
    6. if zoneID == "" {
    7. continue
    8. }
    9. countsByZone[zoneID] += result[i].Score
    10. }

    遍历所有的节点,按比例取十分制的得分。

    1. for i := range result {
    2. // initializing to the default/max node score of maxPriority
    3. fScore := MaxPriorityFloat64
    4. if maxCountByNodeName > 0 {
    5. fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
    6. }
    7. ...
    8. }

    9. 总结

    优选,从满足的节点中选择出最优的节点。PrioritizeNodes最终返回是一个记录了各个节点分数的列表。

    9.1. PrioritizeNodes

    主要流程如下:

    1. 如果没有设置优选函数和拓展函数,则全部节点设置相同的分数,直接返回。
    2. 依次给node执行map函数进行打分。
    3. 再对上述map函数的执行结果执行reduce函数计算最终得分。
    4. 最后根据不同优先级函数的权重对得分取加权平均数。

    其中每类优选函数会包含map函数和reduce函数两种。

    9.2. NewSelectorSpreadPriority

    其中以NewSelectorSpreadPriority这个优选函数为例作分析,该函数的功能是将相同service、RS、RC或statefulset下pod尽量分散到不同的节点上。包括map函数和reduce函数两部分,具体如下。

    9.2.1. CalculateSpreadPriorityMap

    基本流程如下:

    1. 寻找与该pod对应的service、RS、RC、statefulset匹配的selector。
    2. 遍历当前节点的所有pod,将该节点上已存在的selector匹配到的pod的个数作为该节点的分数(此时,分数大的表示匹配到的pod越多,越不符合被调度的条件,该分数在reduce阶段会被按10分制处理成分数大的越符合被调度的条件)。

    9.2.2. CalculateSpreadPriorityReduce

    基本流程如下:

    1. 记录所有节点中匹配到pod个数最多的节点的分数(即匹配到的pod最多的个数)。
    2. 遍历所有的节点,按比例取十分制的得分,计算方式为:(节点中最多匹配pod的个数-当前节点pod的个数)/节点中最多匹配pod的个数。此时,分数越高表示该节点上匹配到的pod的个数越少,越可能被调度到,即满足把相同selector的pod分散到不同节点的需求。

    参考:

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/scheduler/core/generic_scheduler.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/scheduler/algorithm/priorities/selector_spreading.go