内容简介:本篇文章介绍一下 Kubernetes 的默认调度器 kube-scheduler 的源码实现。kubernetes 代码版本:v1.18.4-rc.0。入口函数在路径核心逻辑就是:1. 创建一个 SchedulerCommand(第 4 行);2. 接收参数并执行(第 14 行)。我们先看一下创建 SchedulerCommand 的逻辑。
本篇文章介绍一下 Kubernetes 的默认调度器 kube-scheduler 的源码实现。kubernetes 代码版本:v1.18.4-rc.0。
0. 入口
入口函数在路径 kubernetes/cmd/kube-scheduler/scheduler.go#main()
,如下
func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand() // TODO: once we switch everything over to Cobra commands, we can go back to calling // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the // normalize func and add the go flag set by hand. pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) // utilflag.InitFlags() logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { os.Exit(1) } }
核心逻辑就是:1. 创建一个 SchedulerCommand(第 4 行);2. 接收参数并执行(第 14 行)。我们先看一下创建 SchedulerCommand 的逻辑。
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { opts, err := options.NewOptions() if err != nil { klog.Fatalf("unable to initialize command options: %v", err) } cmd := &cobra.Command{ Use: "kube-scheduler", Long: `The Kubernetes scheduler is a policy-rich, topology-aware, workload-specific function that significantly impacts availability, performance, and capacity. The scheduler needs to take into account individual and collective resource requirements, quality of service requirements, hardware/software/policy constraints, affinity and anti-affinity specifications, data locality, inter-workload interference, deadlines, and so on. Workload-specific requirements will be exposed through the API as necessary. See [scheduling](https://kubernetes.io/docs/concepts/scheduling/) for more information about scheduling and the kube-scheduler component.`, Run: func(cmd *cobra.Command, args []string) { if err := runCommand(cmd, args, opts, registryOptions...); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, } fs := cmd.Flags() ... return cmd }
首先我们可以看到 NewSchedulerCommand 接收一个不定参数,registryOptions。从名字我们可以看出来首先这个参数是作用于一个 Registry 的,这个 Registry 实际上就是用来管理 kuberentes 中的 plugin 的。
// Registry is a collection of all available plugins. The framework uses a // registry to enable and initialize configured plugins. // All plugins must be in the registry before initializing the framework. type Registry map[string]PluginFactory
而 registryOptions 中的 option 其实是一种函数传参的方式的使用。option 传参的方式最早由 Rob Pike 提出来的,简单来说就是将可选的 option 参数封装成多个函数传给目标函数,然后在目标函数内部通过调用 option 函数的方式来初始化。后面我们看到 RegistryOptions 初始化的部分再来介绍。对于 option 这种方式感兴趣的同学可以参考我之前的一篇文章: http://legendtkl.com/2016/11/05/code-scalability/
其次是 cmd,通过 cobra.Command 构建出来的一个 CLI 处理工具,对于命令行的输入通过第 18 行的匿名函数来处理,匿名函数内部会调用函数 runCommand 来启动 scheduler 进程。去掉一些不重要的代码逻辑,runCommand 主要做的事情就是创建 scheduler 参数,然后通过 Run 函数启动 scheduler 进程。
// runCommand runs the scheduler. func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error { ... // 创建 scheduler 参数 c, err := opts.Config() if err != nil { return err } // Get the completed config // 参数补充 cc := c.Complete() // Configz registration. if cz, err := configz.New("componentconfig"); err == nil { cz.Set(cc.ComponentConfig) } else { return fmt.Errorf("unable to register configz: %s", err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() return Run(ctx, cc, registryOptions...) }
Run 函数的主要逻辑如下:
- 初始化 Registry,第 6 ~ 11 行就是 option 这种函数传参的处理逻辑。
- 创建 scheduler 实例
- 其他初始化操作,包括 EventBroadcast、健康检测、metric 等相关逻辑
- 启动 Pod Informer 来监听 Pod
- 运行调度器(分没有启动 leader 选举,但是对应的方法都是 sched.Run 方法)
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error { // To help debugging, immediately log version klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get()) outOfTreeRegistry := make(framework.Registry) for _, option := range outOfTreeRegistryOptions { if err := option(outOfTreeRegistry); err != nil { return err } } recorderFactory := getRecorderFactory(&cc) // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, recorderFactory, ctx.Done(), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), ) if err != nil { return err } // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { cc.Broadcaster.StartRecordingToSink(ctx.Done()) } if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil { cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")}) } // Setup healthz checks. var checks []healthz.HealthChecker if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) } // Start up the healthz server. if cc.InsecureServing != nil { separateMetrics := cc.InsecureMetricsServing != nil handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil) if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start healthz server: %v", err) } } if cc.InsecureMetricsServing != nil { handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil) if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start metrics server: %v", err) } } if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh returned by c.SecureServing.Serve if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } } // Start all informers. go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: sched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, } leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) if err != nil { return fmt.Errorf("couldn't create leader elector: %v", err) } leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // Leader election is disabled, so runCommand inline until done. sched.Run(ctx) return fmt.Errorf("finished without leader elect") }
scheduler 实例
首先我们看一下 scheduler 的定义,路径为 pkg/scheduler/scheduler.go
。
// Scheduler 监听未调度的 Pod,为其寻找适合的 Node 节点,并写回到 api server type Scheduler struct { // 调度器 Cache SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. podConditionUpdater podConditionUpdater // 在抢占情况下用来驱逐 pod,更新抢占者的 'NominatedNode' 字段 podPreemptor podPreemptor // 返回下一个需要调度的 Pod,如果没有需要调度的 pod,则该方法将 block 住。这里不使用 channel 数据结构是因为调度过程可能会花费一定时间,设计者并不想在这个时间内让 Pod 停留在 channel 中。注:虽然官方没有说,这里还有一个可能的原因是 channel 不能持久化数据。 NextPod func() *framework.PodInfo // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*framework.PodInfo, error) // 用一个空的 struct channel 来标识是否需要 stop。Golang 中的惯用用法。 StopEverything <-chan struct{} // 处理 PVC/PV VolumeBinder scheduling.SchedulerVolumeBinder // 是否禁止 Pod 抢占 DisablePreemption bool // 调度队列,需要调度的 Pod 都存在这个队列里面,内部实现是一个优先级队列 SchedulingQueue internalqueue.SchedulingQueue // Profiles are the scheduling profiles. Profiles profile.Map scheduledPodsHasSynced func() bool }
运行调度器
下面看一下调度器
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
在 scheduler 的 Run 函数中主要做了三件事情:
- 等待 scheduler cache 同步(scheduler 刚起来,相当于冷启动)
- 运行调度器队列的 Run 函数
- 运行 scheduler 的 scheduleOne 函数
调度队列
调度队列的 Run 函数第一次看到总是给你一点点疑惑,作为一个队列难道还需要启动吗?确实是这样,如果调度队列只是一个优先级队列,那么确实不需要启动。kubernetes 中的调度队列是由三个队列组成,分别是:
- activeQueue:待调度的 pod 队列,scheduler 会监听这个队列
- backoffQueue:在 kubernetes 中,如果调度失败了,就相当于一次 backoff。backoffQueue 专门用来存放 backoff 的 pod。一般会有一个 backoffLimit 的限制就是最多容忍多少次 backoff。其次每次 backoff 之间的时间成倍增长。
- unschedulableQueue:调度过程被终止的 pod 存放的队列。
调度队列的 Run 函数做的事情就是将 backoffQueue 和 unschedulableQueue 中 pod 定期移动到 activeQueue 中。
// Run starts the goroutine to pump from podBackoffQ to activeQ func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) }
其中 wait.Until
实际上就是一个类似 Cron 的定时调度器。细节实现暂时不细说了。
// Until loops until stop channel is closed, running f every period. // // Until is syntactic sugar on top of JitterUntil with zero jitter factor and // with sliding = true (which means the timer for period starts after the f // completes). func Until(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, true, stopCh) }
我们再来看一下两个 flush 函数的逻辑。首先是 flushBackoffQCompleted()
,主要逻辑如下:
getBackoffTime calculateBackoffDuration()
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ func (p *PriorityQueue) flushBackoffQCompleted() { p.lock.Lock() defer p.lock.Unlock() for { rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { return } pod := rawPodInfo.(*framework.PodInfo).Pod boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo)) if boTime.After(p.clock.Now()) { return } _, err := p.podBackoffQ.Pop() if err != nil { klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod)) return } p.activeQ.Add(rawPodInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() defer p.cond.Broadcast() } } // getBackoffTime returns the time that podInfo completes backoff func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time { duration := p.calculateBackoffDuration(podInfo) backoffTime := podInfo.Timestamp.Add(duration) return backoffTime } // calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration { duration := p.podInitialBackoffDuration for i := 1; i < podInfo.Attempts; i++ { duration = duration * 2 if duration > p.podMaxBackoffDuration { return p.podMaxBackoffDuration } } return duration }
下面我们看一下 unschedulableQueue 中的 pod 是如何 flush 的,也就是函数 flushUnschedulableQLeftover
的实现逻辑。逻辑非常简单,如果 pod 在 unschedulableQueue 中停留时间超过了 60s,就会被移除到 activeQueue。
// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval // to activeQ. func (p *PriorityQueue) flushUnschedulableQLeftover() { p.lock.Lock() defer p.lock.Unlock() var podsToMove []*framework.PodInfo currentTime := p.clock.Now() for _, pInfo := range p.unschedulableQ.podInfoMap { lastScheduleTime := pInfo.Timestamp if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval { podsToMove = append(podsToMove, pInfo) } } if len(podsToMove) > 0 { p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout) } } const ( // If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval, // the pod will be moved from unschedulableQ to activeQ. unschedulableQTimeInterval = 60 * time.Second queueClosed = "scheduling queue is closed" )
以上所述就是小编给大家介绍的《源码面前,了无密码:Kuberentes Scheduler 源码剖析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【Java集合源码剖析】ArrayList源码剖析
- Java集合源码剖析:TreeMap源码剖析
- 我的源码阅读之路:redux源码剖析
- ThreadLocal源码深度剖析
- SharedPreferences源码剖析
- Volley源码剖析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Servlet&JSP学习笔记
林信良 / 清华大学出版社 / 2010-4 / 48.00元
《Servlet&JSP学习笔记》以“在线书签”项目贯穿全书,随着每一章的讲述都在适当的时候将 Servlet & JSP技术应用于“在线书签”程序之中,并作适当修改,以了解完整的应用程序构建方法。《Servlet&JSP学习笔记》内容包括简单的Web应用程序,开发简单的Servlet & JSP合理管理,JSP的使用,整合数据库等相关内容,《Servlet&JSP学习笔记》适合Servlet ......一起来看看 《Servlet&JSP学习笔记》 这本书的介绍吧!