Kube Scheduler 源码分析

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器镜像服务 ACR,镜像仓库100个 不限时长
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: Kube Scheduler 源码分析Kube Scheduler 是负责k8s 集群最终Pod 应该部署到哪台机器的决策者,本章来走读一下Scheduler 基本流程源码。func NewSchedulerCommand() *cobra.

Kube Scheduler 源码分析

Kube Scheduler 是负责k8s 集群最终Pod 应该部署到哪台机器的决策者,本章来走读一下Scheduler 基本流程源码。

func NewSchedulerCommand() *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }

Scheduler 也是一个command line ,启动参数和另外几个组件无异。

Scheduler Run 函数

// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        cc.PodInformer,
        cc.InformerFactory.Core().V1().PersistentVolumes(),
        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
        cc.InformerFactory.Core().V1().ReplicationControllers(),
        cc.InformerFactory.Apps().V1().ReplicaSets(),
        cc.InformerFactory.Apps().V1().StatefulSets(),
        cc.InformerFactory.Core().V1().Services(),
        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        cc.InformerFactory.Storage().V1().StorageClasses(),
        cc.Recorder,
        cc.ComponentConfig.AlgorithmSource,
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
    if err != nil {
        return err
    }

    // Start all informers.
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                utilruntime.HandleError(fmt.Errorf("lost master"))
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

可以看到Run 函数主要做如下几个流程

  • 根据需要观察的Informer 创建Scheduler 实例
  • 将所有的Informer Listener 运行起来
  • 使用Client-go 进行选主
  • 运行真正的调度 run 函数

scheduler.New 函数

// New returns a Scheduler
func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {

    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    // Set up the configurator which can create schedulers from configs.
    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
        SchedulerName:                  options.schedulerName,
        Client:                         client,
        NodeInformer:                   nodeInformer,
        PodInformer:                    podInformer,
        PvInformer:                     pvInformer,
        PvcInformer:                    pvcInformer,
        ReplicationControllerInformer:  replicationControllerInformer,
        ReplicaSetInformer:             replicaSetInformer,
        StatefulSetInformer:            statefulSetInformer,
        ServiceInformer:                serviceInformer,
        PdbInformer:                    pdbInformer,
        StorageClassInformer:           storageClassInformer,
        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
        DisablePreemption:              options.disablePreemption,
        PercentageOfNodesToScore:       options.percentageOfNodesToScore,
        BindTimeoutSeconds:             options.bindTimeoutSeconds,
    })
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = recorder
    config.DisablePreemption = options.disablePreemption
    config.StopEverything = stopCh
    // Create the scheduler.
    sched := NewFromConfig(config)
    return sched, nil
}
  • 将所有的Informer 构建为一个Config Struct
  • 根据Provider 或者 ConfigMap 来创建对应的调度器以及初始化策略(这里可以选择使用config file 或者configMap 模式来指定各种调度策略顺序和权重以及是否开启)
  • 将返回的config 构建成对应的schedu 实例返回

CreateFromConfig

这里我们以Policy 为例看一下具体如何根据config file 创建对应的Scheduler Struct。

// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
    klog.V(2).Infof("Creating scheduler from configuration: %v", policy)

    // validate the policy configuration
    if err := validation.ValidatePolicy(policy); err != nil {
        return nil, err
    }

    predicateKeys := sets.NewString()
    if policy.Predicates == nil {
        klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        predicateKeys = provider.FitPredicateKeys
    } else {
        for _, predicate := range policy.Predicates {
            klog.V(2).Infof("Registering predicate: %s", predicate.Name)
            predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
        }
    }

    priorityKeys := sets.NewString()
    if policy.Priorities == nil {
        klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        priorityKeys = provider.PriorityFunctionKeys
    } else {
        for _, priority := range policy.Priorities {
            klog.V(2).Infof("Registering priority: %s", priority.Name)
            priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
        }
    }

    var extenders []algorithm.SchedulerExtender
    if len(policy.ExtenderConfigs) != 0 {
        ignoredExtendedResources := sets.NewString()
        for ii := range policy.ExtenderConfigs {
            klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
            extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
            if err != nil {
                return nil, err
            }
            extenders = append(extenders, extender)
            for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
                if r.IgnoredByScheduler {
                    ignoredExtendedResources.Insert(string(r.Name))
                }
            }
        }
        predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
    }
  • 如果没有设置Predicates则初始化default Predicates
  • 如果没有设置Priorities 则初始化default Priorities
  • 如果设置ExtenderConfigs 则初始化ExtenderConfigs

下面列一下都有哪些Predicates

// MatchInterPodAffinityPred defines the name of predicate MatchInterPodAffinity.
    MatchInterPodAffinityPred = "MatchInterPodAffinity"
    // CheckVolumeBindingPred defines the name of predicate CheckVolumeBinding.
    CheckVolumeBindingPred = "CheckVolumeBinding"
    // CheckNodeConditionPred defines the name of predicate CheckNodeCondition.
    CheckNodeConditionPred = "CheckNodeCondition"
    // GeneralPred defines the name of predicate GeneralPredicates.
    GeneralPred = "GeneralPredicates"
    // HostNamePred defines the name of predicate HostName.
    HostNamePred = "HostName"
    // PodFitsHostPortsPred defines the name of predicate PodFitsHostPorts.
    PodFitsHostPortsPred = "PodFitsHostPorts"
    // MatchNodeSelectorPred defines the name of predicate MatchNodeSelector.
    MatchNodeSelectorPred = "MatchNodeSelector"
    // PodFitsResourcesPred defines the name of predicate PodFitsResources.
    PodFitsResourcesPred = "PodFitsResources"
    // NoDiskConflictPred defines the name of predicate NoDiskConflict.
    NoDiskConflictPred = "NoDiskConflict"
    // PodToleratesNodeTaintsPred defines the name of predicate PodToleratesNodeTaints.
    PodToleratesNodeTaintsPred = "PodToleratesNodeTaints"
    // CheckNodeUnschedulablePred defines the name of predicate CheckNodeUnschedulablePredicate.
    CheckNodeUnschedulablePred = "CheckNodeUnschedulable"
    // PodToleratesNodeNoExecuteTaintsPred defines the name of predicate PodToleratesNodeNoExecuteTaints.
    PodToleratesNodeNoExecuteTaintsPred = "PodToleratesNodeNoExecuteTaints"
    // CheckNodeLabelPresencePred defines the name of predicate CheckNodeLabelPresence.
    CheckNodeLabelPresencePred = "CheckNodeLabelPresence"
    // CheckServiceAffinityPred defines the name of predicate checkServiceAffinity.
    CheckServiceAffinityPred = "CheckServiceAffinity"
    // MaxEBSVolumeCountPred defines the name of predicate MaxEBSVolumeCount.
    MaxEBSVolumeCountPred = "MaxEBSVolumeCount"
    // MaxGCEPDVolumeCountPred defines the name of predicate MaxGCEPDVolumeCount.
    MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount"
    // MaxAzureDiskVolumeCountPred defines the name of predicate MaxAzureDiskVolumeCount.
    MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount"
    // MaxCinderVolumeCountPred defines the name of predicate MaxCinderDiskVolumeCount.
    MaxCinderVolumeCountPred = "MaxCinderVolumeCount"
    // MaxCSIVolumeCountPred defines the predicate that decides how many CSI volumes should be attached
    MaxCSIVolumeCountPred = "MaxCSIVolumeCountPred"
    // NoVolumeZoneConflictPred defines the name of predicate NoVolumeZoneConflict.
    NoVolumeZoneConflictPred = "NoVolumeZoneConflict"
    // CheckNodeMemoryPressurePred defines the name of predicate CheckNodeMemoryPressure.
    CheckNodeMemoryPressurePred = "CheckNodeMemoryPressure"
    // CheckNodeDiskPressurePred defines the name of predicate CheckNodeDiskPressure.
    CheckNodeDiskPressurePred = "CheckNodeDiskPressure"
    // CheckNodePIDPressurePred defines the name of predicate CheckNodePIDPressure.
    CheckNodePIDPressurePred = "CheckNodePIDPressure"

    // DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
    // GCE instances can have up to 16 PD volumes attached.
    DefaultMaxGCEPDVolumes = 16
    // DefaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure
    // Larger Azure VMs can actually have much more disks attached.
    // TODO We should determine the max based on VM size
    DefaultMaxAzureDiskVolumes = 16

    // KubeMaxPDVols defines the maximum number of PD Volumes per kubelet
    KubeMaxPDVols = "KUBE_MAX_PD_VOLS"

    // EBSVolumeFilterType defines the filter name for EBSVolumeFilter.
    EBSVolumeFilterType = "EBS"
    // GCEPDVolumeFilterType defines the filter name for GCEPDVolumeFilter.
    GCEPDVolumeFilterType = "GCE"
    // AzureDiskVolumeFilterType defines the filter name for AzureDiskVolumeFilter.
    AzureDiskVolumeFilterType = "AzureDisk"
    // CinderVolumeFilterType defines the filter name for CinderVolumeFilter.
    CinderVolumeFilterType = "Cinder"

所有的priorities如下

EqualPriority = "EqualPriority"
    // MostRequestedPriority defines the name of prioritizer function that gives used nodes higher priority.
    MostRequestedPriority = "MostRequestedPriority"
    // RequestedToCapacityRatioPriority defines the name of RequestedToCapacityRatioPriority.
    RequestedToCapacityRatioPriority = "RequestedToCapacityRatioPriority"
    // SelectorSpreadPriority defines the name of prioritizer function that spreads pods by minimizing
    // the number of pods (belonging to the same service or replication controller) on the same node.
    SelectorSpreadPriority = "SelectorSpreadPriority"
    // ServiceSpreadingPriority is largely replaced by "SelectorSpreadPriority".
    ServiceSpreadingPriority = "ServiceSpreadingPriority"
    // InterPodAffinityPriority defines the name of prioritizer function that decides which pods should or
    // should not be placed in the same topological domain as some other pods.
    InterPodAffinityPriority = "InterPodAffinityPriority"
    // LeastRequestedPriority defines the name of prioritizer function that prioritize nodes by least
    // requested utilization.
    LeastRequestedPriority = "LeastRequestedPriority"
    // BalancedResourceAllocation defines the name of prioritizer function that prioritizes nodes
    // to help achieve balanced resource usage.
    BalancedResourceAllocation = "BalancedResourceAllocation"
    // NodePreferAvoidPodsPriority defines the name of prioritizer function that priorities nodes according to
    // the node annotation "scheduler.alpha.kubernetes.io/preferAvoidPods".
    NodePreferAvoidPodsPriority = "NodePreferAvoidPodsPriority"
    // NodeAffinityPriority defines the name of prioritizer function that prioritizes nodes which have labels
    // matching NodeAffinity.
    NodeAffinityPriority = "NodeAffinityPriority"
    // TaintTolerationPriority defines the name of prioritizer function that prioritizes nodes that marked
    // with taint which pod can tolerate.
    TaintTolerationPriority = "TaintTolerationPriority"
    // ImageLocalityPriority defines the name of prioritizer function that prioritizes nodes that have images
    // requested by the pod present.
    ImageLocalityPriority = "ImageLocalityPriority"
    // ResourceLimitsPriority defines the nodes of prioritizer function ResourceLimitsPriority.
    ResourceLimitsPriority = "ResourceLimitsPriority"

scheduleOne

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
    plugins := sched.config.PluginSet
    // Remove all plugin context data at the beginning of a scheduling cycle.
    if plugins.Data().Ctx != nil {
        plugins.Data().Ctx.Reset()
    }

    pod := sched.config.NextPod()
    // pod could be nil when schedulerQueue is closed
    if pod == nil {
        return
    }
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    scheduleResult, err := sched.schedule(pod)
    err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: scheduleResult.SuggestedHost,
            },
        })

这里就是最终调度Pod的最终函数,可以看到,它先从队列里面得到所有的未调度的 Pod,这里判断是否未调度主要根据Pod 的NodeName 是否已经被分配为标准,后面我们再介绍这个队列是怎么运作的。

取得Pod Info 后,依次运行刚才注册的预选与优选调度策略,最终选择出来合适的Node,然后调用Client go 的Bind 方法,将Pod 绑定到Node 上完成调度动作。

调度Pod Queue 构成

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
    stopEverything := args.StopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

    // storageClassInformer is only enabled through VolumeScheduling feature gate
    var storageClassLister storagelisters.StorageClassLister
    if args.StorageClassInformer != nil {
        storageClassLister = args.StorageClassInformer.Lister()
    }
    c := &configFactory{
        client:                         args.Client,
        podLister:                      schedulerCache,
        podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
        nodeLister:                     args.NodeInformer.Lister(),
        pVLister:                       args.PvInformer.Lister(),
        pVCLister:                      args.PvcInformer.Lister(),
        serviceLister:                  args.ServiceInformer.Lister(),
        controllerLister:               args.ReplicationControllerInformer.Lister(),
        replicaSetLister:               args.ReplicaSetInformer.Lister(),
        statefulSetLister:              args.StatefulSetInformer.Lister(),
        pdbLister:                      args.PdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  args.SchedulerName,
        hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
        disablePreemption:              args.DisablePreemption,
        percentageOfNodesToScore:       args.PercentageOfNodesToScore,
    }

    c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
    // scheduled pod cache
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering scheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering scheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
    // unscheduled pod queue
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering unscheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering unscheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToSchedulingQueue,
                UpdateFunc: c.updatePodInSchedulingQueue,
                DeleteFunc: c.deletePodFromSchedulingQueue,
            },
        },
    )

这里完成了针对Pod 的Watch,同时将未调度的Pod 放入未调度Queue 方便调度器进行调度。可以看到,这里同样适用的Client go Informer 机制来同步Pod 的信息。

PodFitsHostPorts

由于PreDicates非常多,我们就以检查 Pod 端口冲突为例看一下PreDicates 函数

// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
func PodFitsHostPorts(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
    var wantPorts []*v1.ContainerPort
    if predicateMeta, ok := meta.(*predicateMetadata); ok {
        wantPorts = predicateMeta.podPorts
    } else {
        // We couldn't parse metadata - fallback to computing it.
        wantPorts = schedutil.GetContainerPorts(pod)
    }
    if len(wantPorts) == 0 {
        return true, nil, nil
    }

    existingPorts := nodeInfo.UsedPorts()

    // try to see whether existingPorts and  wantPorts will conflict or not
    if portsConflict(existingPorts, wantPorts) {
        return false, []PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
    }

    return true, nil, nil
}
  • 根据传入的Pod info 查看Pod 需要暴露的端口
  • 根据Node info 判断当前Node 可用端口
  • 判断是否被占用并返回结果

ImageLocalityPriority

func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
    node := nodeInfo.Node()
    if node == nil {
        return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
    }

    var score int
    if priorityMeta, ok := meta.(*priorityMetadata); ok {
        score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
    } else {
        // if we are not able to parse priority meta data, skip this priority
        score = 0
    }

    return schedulerapi.HostPriority{
        Host:  node.Name,
        Score: score,
    }, nil
}

Priority我们以镜像为例,可以看到,先根据Pod info 来查看需要哪些镜像,然后根据Node info 查看当前镜像是否存在,存在的越多打分越高。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
5月前
|
调度 Apache
airflow scheduler -D 是什么作用
【6月更文挑战第30天】airflow scheduler -D 是什么作用
103 1
|
运维 资源调度 Kubernetes
Kubernetes Scheduler Framework 扩展: 1. Coscheduling
# 前言 ## 为什么Kubernetes需要Coscheduling功能? Kubernetes目前已经广泛的应用于在线服务编排,为了提升集群的的利用率和运行效率,我们希望将Kubernetes作为一个统一的管理平台来管理在线服务和离线作业。但是默认的调度器是以Pod为调度单元进行依次调度,不会考虑Pod之间的相互关系。但是很多数据计算类的作业具有All-or-Nothing特点,要求所有的
3119 0
|
2月前
|
Kubernetes 算法 调度
Kubernetes的灵魂核心:kube-scheduler
本文介绍了Kubernetes中关键组件kube-scheduler的工作原理,详细解释了其通过预选和优选过程为Pod选择合适节点的机制,并提供了一个简化的Python示例来模拟这一过程,帮助读者更好地理解和管理Kubernetes集群。
|
5月前
|
Kubernetes 监控 调度
K8S中Scheduler原理分析
【6月更文挑战第20天】K8S Scheduler是集群的关键组件,它监听API Server,为新Pod选择合适的Node。
|
6月前
|
Kubernetes 监控 调度
|
6月前
|
资源调度 分布式计算 算法
Gang Scheduling
Gang Scheduling(Coscheduling)、FIFO Scheduling、Capacity Scheduling、Fair sharing、Binpack/Spread等是云计算和分布式系统中的任务调度算法,用于在资源有限的情况下,公平、高效地分配任务和资源。下面是这些调度算法的基本介绍和如何在实际应用中使用它们的一些建议:
289 2
|
缓存 Kubernetes NoSQL
图文并茂带你解读 Kube-scheduler
Hello folks,今天为大家分享一个由 ContainerLabs 出品的关于 Kubernetes Scheduler 的文章。
181 0
|
存储 分布式计算 Kubernetes
【k8s系列3】kubernetes(k8s) scheduler backend 调度的实现
【k8s系列3】kubernetes(k8s) scheduler backend 调度的实现
262 0
【k8s系列3】kubernetes(k8s) scheduler backend 调度的实现
|
API 调度 Perl
kube-scheduler
kube-scheduler
149 0
下一篇
无影云桌面