• kube-scheduler源码分析(四)之 findNodesThatFit
  • 1. 调用入口
  • 2. findNodesThatFit
  • 3. numFeasibleNodesToFind
  • 4. checkNode
  • 5. podFitsOnNode
    • 5.1. predicateFuncs
  • 6. PodFitsResources
    • 6.1. NodeInfo
    • 6.2. Resource
    • 6.3. allowedPodNumber
    • 6.4. podRequest
    • 6.5. AllocatableResource
    • 6.6. ScalarResources
  • 7. 总结
    • 7.1. checkNode
    • 7.2. podFitsOnNode
    • 7.3. PodFitsResources

    kube-scheduler源码分析(四)之 findNodesThatFit

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析调度逻辑中的预选策略,即第一步筛选出符合pod调度条件的节点。

    1. 调用入口

    预选,通过预选函数来判断每个节点是否适合被该Pod调度。

    genericScheduler.Schedule中对findNodesThatFit的调用过程如下:

    此部分代码位于pkg/scheduler/core/generic_scheduler.go

    1. func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    2. ...
    3. // 列出所有的节点
    4. nodes, err := nodeLister.List()
    5. if err != nil {
    6. return "", err
    7. }
    8. if len(nodes) == 0 {
    9. return "", ErrNoNodesAvailable
    10. }
    11. // Used for all fit and priority funcs.
    12. err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
    13. if err != nil {
    14. return "", err
    15. }
    16. trace.Step("Computing predicates")
    17. startPredicateEvalTime := time.Now()
    18. // 调用findNodesThatFit过滤出预选节点
    19. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    20. if err != nil {
    21. return "", err
    22. }
    23. if len(filteredNodes) == 0 {
    24. return "", &FitError{
    25. Pod: pod,
    26. NumAllNodes: len(nodes),
    27. FailedPredicates: failedPredicateMap,
    28. }
    29. }
    30. // metrics
    31. metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
    32. metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
    33. ...
    34. }

    核心代码:

    1. // 调用findNodesThatFit过滤出预选节点
    2. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)

    2. findNodesThatFit

    findNodesThatFit基于给定的预选函数过滤node,每个node传入到预选函数中来确实该节点是否符合要求。

    findNodesThatFit的入参是被调度的pod和当前的节点列表,返回预选节点列表和错误。

    findNodesThatFit基本流程如下:

    1. 设置可行节点的总数,作为预选节点数组的容量,避免总节点过多需要筛选的节点过多。
    2. 通过NodeTree不断获取下一个节点来判断该节点是否满足pod的调度条件。
    3. 通过之前注册的各种预选函数来判断当前节点是否符合pod的调度条件。
    4. 最后返回满足调度条件的node列表,供下一步的优选操作。

    findNodesThatFit完整代码如下:

    此部分代码位于pkg/scheduler/core/generic_scheduler.go

    1. // Filters the nodes to find the ones that fit based on the given predicate functions
    2. // Each node is passed through the predicate functions to determine if it is a fit
    3. func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
    4. var filtered []*v1.Node
    5. failedPredicateMap := FailedPredicateMap{}
    6. if len(g.predicates) == 0 {
    7. filtered = nodes
    8. } else {
    9. allNodes := int32(g.cache.NodeTree().NumNodes)
    10. numNodesToFind := g.numFeasibleNodesToFind(allNodes)
    11. // Create filtered list with enough space to avoid growing it
    12. // and allow assigning.
    13. filtered = make([]*v1.Node, numNodesToFind)
    14. errs := errors.MessageCountMap{}
    15. var (
    16. predicateResultLock sync.Mutex
    17. filteredLen int32
    18. equivClass *equivalence.Class
    19. )
    20. ctx, cancel := context.WithCancel(context.Background())
    21. // We can use the same metadata producer for all nodes.
    22. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
    23. if g.equivalenceCache != nil {
    24. // getEquivalenceClassInfo will return immediately if no equivalence pod found
    25. equivClass = equivalence.NewClass(pod)
    26. }
    27. checkNode := func(i int) {
    28. var nodeCache *equivalence.NodeCache
    29. nodeName := g.cache.NodeTree().Next()
    30. if g.equivalenceCache != nil {
    31. nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
    32. }
    33. fits, failedPredicates, err := podFitsOnNode(
    34. pod,
    35. meta,
    36. g.cachedNodeInfoMap[nodeName],
    37. g.predicates,
    38. g.cache,
    39. nodeCache,
    40. g.schedulingQueue,
    41. g.alwaysCheckAllPredicates,
    42. equivClass,
    43. )
    44. if err != nil {
    45. predicateResultLock.Lock()
    46. errs[err.Error()]++
    47. predicateResultLock.Unlock()
    48. return
    49. }
    50. if fits {
    51. length := atomic.AddInt32(&filteredLen, 1)
    52. if length > numNodesToFind {
    53. cancel()
    54. atomic.AddInt32(&filteredLen, -1)
    55. } else {
    56. filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
    57. }
    58. } else {
    59. predicateResultLock.Lock()
    60. failedPredicateMap[nodeName] = failedPredicates
    61. predicateResultLock.Unlock()
    62. }
    63. }
    64. // Stops searching for more nodes once the configured number of feasible nodes
    65. // are found.
    66. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
    67. filtered = filtered[:filteredLen]
    68. if len(errs) > 0 {
    69. return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
    70. }
    71. }
    72. if len(filtered) > 0 && len(g.extenders) != 0 {
    73. for _, extender := range g.extenders {
    74. if !extender.IsInterested(pod) {
    75. continue
    76. }
    77. filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
    78. if err != nil {
    79. if extender.IsIgnorable() {
    80. glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
    81. extender, err)
    82. continue
    83. } else {
    84. return []*v1.Node{}, FailedPredicateMap{}, err
    85. }
    86. }
    87. for failedNodeName, failedMsg := range failedMap {
    88. if _, found := failedPredicateMap[failedNodeName]; !found {
    89. failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
    90. }
    91. failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
    92. }
    93. filtered = filteredList
    94. if len(filtered) == 0 {
    95. break
    96. }
    97. }
    98. }
    99. return filtered, failedPredicateMap, nil
    100. }

    以下对findNodesThatFit分段分析。

    3. numFeasibleNodesToFind

    findNodesThatFit先基于所有的节点找出可行的节点是总数。numFeasibleNodesToFind的作用主要是避免当节点过多(超过100)影响调度的效率。

    1. allNodes := int32(g.cache.NodeTree().NumNodes)
    2. numNodesToFind := g.numFeasibleNodesToFind(allNodes)
    3. // Create filtered list with enough space to avoid growing it
    4. // and allow assigning.
    5. filtered = make([]*v1.Node, numNodesToFind)

    numFeasibleNodesToFind基本流程如下:

    • 如果所有的node节点小于minFeasibleNodesToFind(当前默认为100)则返回节点数。
    • 如果节点数超100,则取指定计分的百分比的节点数,当该百分比后的数目仍小于minFeasibleNodesToFind,则返回minFeasibleNodesToFind
    • 如果百分比后的数目大于minFeasibleNodesToFind,则返回该百分比。
    1. // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
    2. // its search for more feasible nodes.
    3. func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) int32 {
    4. if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore <= 0 ||
    5. g.percentageOfNodesToScore >= 100 {
    6. return numAllNodes
    7. }
    8. numNodes := numAllNodes * g.percentageOfNodesToScore / 100
    9. if numNodes < minFeasibleNodesToFind {
    10. return minFeasibleNodesToFind
    11. }
    12. return numNodes
    13. }

    4. checkNode

    checkNode是一个校验node是否符合要求的函数,其中实际调用到的核心函数是podFitsOnNode。再通过workqueue并发执行checkNode操作。

    checkNode主要流程如下:

    1. 通过cache中的nodeTree不断获取下一个node。
    2. 将当前node和pod传入podFitsOnNode判断当前node是否符合要求。
    3. 如果当前node符合要求就将当前node加入预选节点的数组中filtered
    4. 如果当前node不满足要求,则加入到失败的数组中,并记录原因。
    5. 通过workqueue.ParallelizeUntil并发执行checkNode函数,一旦找到配置的可行节点数,就停止搜索更多节点。
    1. checkNode := func(i int) {
    2. var nodeCache *equivalence.NodeCache
    3. nodeName := g.cache.NodeTree().Next()
    4. if g.equivalenceCache != nil {
    5. nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
    6. }
    7. fits, failedPredicates, err := podFitsOnNode(
    8. pod,
    9. meta,
    10. g.cachedNodeInfoMap[nodeName],
    11. g.predicates,
    12. g.cache,
    13. nodeCache,
    14. g.schedulingQueue,
    15. g.alwaysCheckAllPredicates,
    16. equivClass,
    17. )
    18. if err != nil {
    19. predicateResultLock.Lock()
    20. errs[err.Error()]++
    21. predicateResultLock.Unlock()
    22. return
    23. }
    24. if fits {
    25. length := atomic.AddInt32(&filteredLen, 1)
    26. if length > numNodesToFind {
    27. cancel()
    28. atomic.AddInt32(&filteredLen, -1)
    29. } else {
    30. filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
    31. }
    32. } else {
    33. predicateResultLock.Lock()
    34. failedPredicateMap[nodeName] = failedPredicates
    35. predicateResultLock.Unlock()
    36. }
    37. }

    workqueue的并发操作:

    1. // Stops searching for more nodes once the configured number of feasible nodes
    2. // are found.
    3. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

    ParallelizeUntil具体代码如下:

    1. // ParallelizeUntil is a framework that allows for parallelizing N
    2. // independent pieces of work until done or the context is canceled.
    3. func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
    4. var stop <-chan struct{}
    5. if ctx != nil {
    6. stop = ctx.Done()
    7. }
    8. toProcess := make(chan int, pieces)
    9. for i := 0; i < pieces; i++ {
    10. toProcess <- i
    11. }
    12. close(toProcess)
    13. if pieces < workers {
    14. workers = pieces
    15. }
    16. wg := sync.WaitGroup{}
    17. wg.Add(workers)
    18. for i := 0; i < workers; i++ {
    19. go func() {
    20. defer utilruntime.HandleCrash()
    21. defer wg.Done()
    22. for piece := range toProcess {
    23. select {
    24. case <-stop:
    25. return
    26. default:
    27. doWorkPiece(piece)
    28. }
    29. }
    30. }()
    31. }
    32. wg.Wait()
    33. }

    5. podFitsOnNode

    podFitsOnNode主要内容如下:

    • podFitsOnNode会检查给定的某个Node是否满足预选的函数。

    • 对于给定的pod,podFitsOnNode会检查是否有相同的pod存在,尽量复用缓存过的预选结果。

    podFitsOnNode主要在Schedule(调度)和Preempt(抢占)的时候被调用。

    当在Schedule中被调用的时候,主要判断是否可以被调度到当前节点,依据为当前节点上所有已存在的pod及被提名要运行到该节点的具有相等或更高优先级的pod。

    当在Preempt中被调用的时候,即发生抢占的时候,通过SelectVictimsOnNode函数选出需要被移除的pod,移除后然后将预调度的pod调度到该节点上。

    podFitsOnNode基本流程如下:

    1. 遍历之前注册好的预选策略predicates.Ordering,并获取预选策略的执行函数。
    2. 遍历执行每个预选函数,并返回是否合适,预选失败的原因和错误。
    3. 如果预选函数执行的结果不合适,则加入预选失败的数组中。
    4. 最后返回预选失败的个数是否为0,和预选失败的原因。

    入参:

    • pod
    • PredicateMetadata
    • NodeInfo
    • predicateFuncs
    • schedulercache.Cache
    • nodeCache
    • SchedulingQueue
    • alwaysCheckAllPredicates
    • equivClass

    出参:

    • fit
    • PredicateFailureReason

    完整代码如下:

    此部分代码位于pkg/scheduler/core/generic_scheduler.go

    1. // podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
    2. // For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
    3. // predicate results as possible.
    4. // This function is called from two different places: Schedule and Preempt.
    5. // When it is called from Schedule, we want to test whether the pod is schedulable
    6. // on the node with all the existing pods on the node plus higher and equal priority
    7. // pods nominated to run on the node.
    8. // When it is called from Preempt, we should remove the victims of preemption and
    9. // add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
    10. // It removes victims from meta and NodeInfo before calling this function.
    11. func podFitsOnNode(
    12. pod *v1.Pod,
    13. meta algorithm.PredicateMetadata,
    14. info *schedulercache.NodeInfo,
    15. predicateFuncs map[string]algorithm.FitPredicate,
    16. cache schedulercache.Cache,
    17. nodeCache *equivalence.NodeCache,
    18. queue SchedulingQueue,
    19. alwaysCheckAllPredicates bool,
    20. equivClass *equivalence.Class,
    21. ) (bool, []algorithm.PredicateFailureReason, error) {
    22. var (
    23. eCacheAvailable bool
    24. failedPredicates []algorithm.PredicateFailureReason
    25. )
    26. podsAdded := false
    27. // We run predicates twice in some cases. If the node has greater or equal priority
    28. // nominated pods, we run them when those pods are added to meta and nodeInfo.
    29. // If all predicates succeed in this pass, we run them again when these
    30. // nominated pods are not added. This second pass is necessary because some
    31. // predicates such as inter-pod affinity may not pass without the nominated pods.
    32. // If there are no nominated pods for the node or if the first run of the
    33. // predicates fail, we don't run the second pass.
    34. // We consider only equal or higher priority pods in the first pass, because
    35. // those are the current "pod" must yield to them and not take a space opened
    36. // for running them. It is ok if the current "pod" take resources freed for
    37. // lower priority pods.
    38. // Requiring that the new pod is schedulable in both circumstances ensures that
    39. // we are making a conservative decision: predicates like resources and inter-pod
    40. // anti-affinity are more likely to fail when the nominated pods are treated
    41. // as running, while predicates like pod affinity are more likely to fail when
    42. // the nominated pods are treated as not running. We can't just assume the
    43. // nominated pods are running because they are not running right now and in fact,
    44. // they may end up getting scheduled to a different node.
    45. for i := 0; i < 2; i++ {
    46. metaToUse := meta
    47. nodeInfoToUse := info
    48. if i == 0 {
    49. podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
    50. } else if !podsAdded || len(failedPredicates) != 0 {
    51. break
    52. }
    53. // Bypass eCache if node has any nominated pods.
    54. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
    55. // when pods are nominated or their nominations change.
    56. eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
    57. for _, predicateKey := range predicates.Ordering() {
    58. var (
    59. fit bool
    60. reasons []algorithm.PredicateFailureReason
    61. err error
    62. )
    63. //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
    64. if predicate, exist := predicateFuncs[predicateKey]; exist {
    65. if eCacheAvailable {
    66. fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
    67. } else {
    68. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
    69. }
    70. if err != nil {
    71. return false, []algorithm.PredicateFailureReason{}, err
    72. }
    73. if !fit {
    74. // eCache is available and valid, and predicates result is unfit, record the fail reasons
    75. failedPredicates = append(failedPredicates, reasons...)
    76. // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
    77. if !alwaysCheckAllPredicates {
    78. glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
    79. "evaluation is short circuited and there are chances " +
    80. "of other predicates failing as well.")
    81. break
    82. }
    83. }
    84. }
    85. }
    86. }
    87. return len(failedPredicates) == 0, failedPredicates, nil
    88. }

    5.1. predicateFuncs

    根据之前初注册好的预选策略函数来执行预选,判断节点是否符合调度。

    1. for _, predicateKey := range predicates.Ordering() {
    2. if predicate, exist := predicateFuncs[predicateKey]; exist {
    3. if eCacheAvailable {
    4. fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
    5. } else {
    6. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
    7. }

    预选策略如下:

    1. var (
    2. predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
    3. GeneralPred, HostNamePred, PodFitsHostPortsPred,
    4. MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
    5. PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
    6. CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
    7. MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
    8. CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
    9. )

    6. PodFitsResources

    以下以PodFitsResources这个预选函数为例做分析,其他重要的预选函数待后续单独分析。

    PodFitsResources用来检查一个节点是否有足够的资源来运行当前的pod,包括CPU、内存、GPU等。

    PodFitsResources基本流程如下:

    1. 判断当前节点上pod总数加上预调度pod个数是否大于node的可分配pod总数,若是则不允许调度。
    2. 判断pod的request值是否都为0,若是则允许调度。
    3. 判断pod的request值加上当前node上所有pod的request值总和是否大于node的可分配资源,若是则不允许调度。
    4. 判断pod的拓展资源request值加上当前node上所有pod对应的request值总和是否大于node对应的可分配资源,若是则不允许调度。

    PodFitsResources的注册代码如下:

    1. factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)

    PodFitsResources入参:

    • pod

    • nodeInfo

    • PredicateMetadata

    PodFitsResources出参:

    • fit
    • PredicateFailureReason

    PodFitsResources完整代码:

    此部分的代码位于pkg/scheduler/algorithm/predicates/predicates.go

    1. // PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
    2. // First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
    3. // predicate failure reasons if the node has insufficient resources to run the pod.
    4. func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
    5. node := nodeInfo.Node()
    6. if node == nil {
    7. return false, nil, fmt.Errorf("node not found")
    8. }
    9. var predicateFails []algorithm.PredicateFailureReason
    10. allowedPodNumber := nodeInfo.AllowedPodNumber()
    11. if len(nodeInfo.Pods())+1 > allowedPodNumber {
    12. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
    13. }
    14. // No extended resources should be ignored by default.
    15. ignoredExtendedResources := sets.NewString()
    16. var podRequest *schedulercache.Resource
    17. if predicateMeta, ok := meta.(*predicateMetadata); ok {
    18. podRequest = predicateMeta.podRequest
    19. if predicateMeta.ignoredExtendedResources != nil {
    20. ignoredExtendedResources = predicateMeta.ignoredExtendedResources
    21. }
    22. } else {
    23. // We couldn't parse metadata - fallback to computing it.
    24. podRequest = GetResourceRequest(pod)
    25. }
    26. if podRequest.MilliCPU == 0 &&
    27. podRequest.Memory == 0 &&
    28. podRequest.EphemeralStorage == 0 &&
    29. len(podRequest.ScalarResources) == 0 {
    30. return len(predicateFails) == 0, predicateFails, nil
    31. }
    32. allocatable := nodeInfo.AllocatableResource()
    33. if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
    34. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
    35. }
    36. if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
    37. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
    38. }
    39. if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
    40. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
    41. }
    42. for rName, rQuant := range podRequest.ScalarResources {
    43. if v1helper.IsExtendedResourceName(rName) {
    44. // If this resource is one of the extended resources that should be
    45. // ignored, we will skip checking it.
    46. if ignoredExtendedResources.Has(string(rName)) {
    47. continue
    48. }
    49. }
    50. if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
    51. predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
    52. }
    53. }
    54. if glog.V(10) {
    55. if len(predicateFails) == 0 {
    56. // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
    57. // not logged. There is visible performance gain from it.
    58. glog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
    59. podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
    60. }
    61. }
    62. return len(predicateFails) == 0, predicateFails, nil
    63. }

    6.1. NodeInfo

    NodeInfo是node的聚合信息,主要包括:

    • node:k8s node的结构体
    • pods:当前node上pod的数量
    • requestedResource:当前node上所有pod的request总和
    • allocatableResource:node的实际所有的可分配资源(对应于Node.Status.Allocatable.*),可理解为node的资源总量。

    此部分代码位于pkg/scheduler/cache/node_info.go

    1. // NodeInfo is node level aggregated information.
    2. type NodeInfo struct {
    3. // Overall node information.
    4. node *v1.Node
    5. pods []*v1.Pod
    6. podsWithAffinity []*v1.Pod
    7. usedPorts util.HostPortInfo
    8. // Total requested resource of all pods on this node.
    9. // It includes assumed pods which scheduler sends binding to apiserver but
    10. // didn't get it as scheduled yet.
    11. requestedResource *Resource
    12. nonzeroRequest *Resource
    13. // We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
    14. // as int64, to avoid conversions and accessing map.
    15. allocatableResource *Resource
    16. // Cached taints of the node for faster lookup.
    17. taints []v1.Taint
    18. taintsErr error
    19. // imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
    20. // checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
    21. // state information.
    22. imageStates map[string]*ImageStateSummary
    23. // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
    24. // scheduling cycle.
    25. // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
    26. TransientInfo *transientSchedulerInfo
    27. // Cached conditions of node for faster lookup.
    28. memoryPressureCondition v1.ConditionStatus
    29. diskPressureCondition v1.ConditionStatus
    30. pidPressureCondition v1.ConditionStatus
    31. // Whenever NodeInfo changes, generation is bumped.
    32. // This is used to avoid cloning it if the object didn't change.
    33. generation int64
    34. }

    6.2. Resource

    Resource是可计算资源的集合体。主要包括:

    • MilliCPU
    • Memory
    • EphemeralStorage
    • AllowedPodNumber:允许的pod总数(对应于Node.Status.Allocatable.Pods().Value()),一般为110。
    • ScalarResources
    1. // Resource is a collection of compute resource.
    2. type Resource struct {
    3. MilliCPU int64
    4. Memory int64
    5. EphemeralStorage int64
    6. // We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
    7. // explicitly as int, to avoid conversions and improve performance.
    8. AllowedPodNumber int
    9. // ScalarResources
    10. ScalarResources map[v1.ResourceName]int64
    11. }

    以下分析podFitsOnNode的具体流程。

    6.3. allowedPodNumber

    首先获取节点的信息,先判断如果该节点当前所有的pod的个数加上当前预调度的pod是否会大于该节点允许的pod的总数,一般为110个。如果超过,则predicateFails数组增加1,即当前节点不适合该pod。

    1. node := nodeInfo.Node()
    2. if node == nil {
    3. return false, nil, fmt.Errorf("node not found")
    4. }
    5. var predicateFails []algorithm.PredicateFailureReason
    6. allowedPodNumber := nodeInfo.AllowedPodNumber()
    7. if len(nodeInfo.Pods())+1 > allowedPodNumber {
    8. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
    9. }

    6.4. podRequest

    如果podRequest都为0,则允许调度到该节点,直接返回结果。

    1. if podRequest.MilliCPU == 0 &&
    2. podRequest.Memory == 0 &&
    3. podRequest.EphemeralStorage == 0 &&
    4. len(podRequest.ScalarResources) == 0 {
    5. return len(predicateFails) == 0, predicateFails, nil
    6. }

    6.5. AllocatableResource

    如果当前预调度的pod的request资源加上当前node上所有pod的request总和大于该node的可分配资源总量,则不允许调度到该节点,直接返回结果。其中request资源包括CPU、内存、storage。

    1. allocatable := nodeInfo.AllocatableResource()
    2. if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
    3. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
    4. }
    5. if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
    6. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
    7. }
    8. if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
    9. predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
    10. }

    6.6. ScalarResources

    判断其他拓展的标量资源,是否该pod的request值加上当前node上所有pod的对应资源的request总和大于该node上对应资源的可分配总量,如果是,则不允许调度到该节点。

    1. for rName, rQuant := range podRequest.ScalarResources {
    2. if v1helper.IsExtendedResourceName(rName) {
    3. // If this resource is one of the extended resources that should be
    4. // ignored, we will skip checking it.
    5. if ignoredExtendedResources.Has(string(rName)) {
    6. continue
    7. }
    8. }
    9. if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
    10. predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
    11. }
    12. }

    7. 总结

    findNodesThatFit基于给定的预选函数过滤node,每个node传入到预选函数中来确实该节点是否符合要求。

    findNodesThatFit的入参是被调度的pod和当前的节点列表,返回预选节点列表和错误。

    findNodesThatFit基本流程如下:

    1. 设置可行节点的总数,作为预选节点数组的容量,避免总节点过多导致需要筛选的节点过多,效率低。
    2. 通过NodeTree不断获取下一个节点来判断该节点是否满足pod的调度条件。
    3. 通过之前注册的各种预选函数来判断当前节点是否符合pod的调度条件。
    4. 最后返回满足调度条件的node列表,供下一步的优选操作。

    7.1. checkNode

    checkNode是一个校验node是否符合要求的函数,其中实际调用到的核心函数是podFitsOnNode。再通过workqueue并发执行checkNode操作。

    checkNode主要流程如下:

    1. 通过cache中的nodeTree不断获取下一个node。
    2. 将当前node和pod传入podFitsOnNode判断当前node是否符合要求。
    3. 如果当前node符合要求就将当前node加入预选节点的数组中filtered
    4. 如果当前node不满足要求,则加入到失败的数组中,并记录原因。
    5. 通过workqueue.ParallelizeUntil并发执行checkNode函数,一旦找到配置的可行节点数,就停止搜索更多节点。

    7.2. podFitsOnNode

    其中会调用到核心函数podFitsOnNode。

    podFitsOnNode主要内容如下:

    • podFitsOnNode会检查给定的某个Node是否满足预选的函数。

    • 对于给定的pod,podFitsOnNode会检查是否有相同的pod存在,尽量复用缓存过的预选结果。

    podFitsOnNode主要在Schedule(调度)和Preempt(抢占)的时候被调用。

    当在Schedule中被调用的时候,主要判断是否可以被调度到当前节点,依据为当前节点上所有已存在的pod及被提名要运行到该节点的具有相等或更高优先级的pod。

    当在Preempt中被调用的时候,即发生抢占的时候,通过SelectVictimsOnNode函数选出需要被移除的pod,移除后然后将预调度的pod调度到该节点上。

    podFitsOnNode基本流程如下:

    1. 遍历之前注册好的预选策略predicates.Ordering,并获取预选策略的执行函数。
    2. 遍历执行每个预选函数,并返回是否合适,预选失败的原因和错误。
    3. 如果预选函数执行的结果不合适,则加入预选失败的数组中。
    4. 最后返回预选失败的个数是否为0,和预选失败的原因。

    7.3. PodFitsResources

    本文只示例分析了其中一个重要的预选函数:PodFitsResources

    PodFitsResources用来检查一个节点是否有足够的资源来运行当前的pod,包括CPU、内存、GPU等。

    PodFitsResources基本流程如下:

    1. 判断当前节点上pod总数加上预调度pod个数是否大于node的可分配pod总数,若是则不允许调度。
    2. 判断pod的request值是否都为0,若是则允许调度。
    3. 判断pod的request值加上当前node上所有pod的request值总和是否大于node的可分配资源,若是则不允许调度。
    4. 判断pod的拓展资源request值加上当前node上所有pod对应的request值总和是否大于node对应的可分配资源,若是则不允许调度。

    参考:

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/scheduler/core/generic_scheduler.go

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/scheduler/algorithm/predicates/predicates.go