内容简介:本篇文章介绍一下 Kubernetes 的默认调度器 kube-scheduler 的源码实现。kubernetes 代码版本:v1.18.4-rc.0。入口函数在路径核心逻辑就是:1. 创建一个 SchedulerCommand(第 4 行);2. 接收参数并执行(第 14 行)。我们先看一下创建 SchedulerCommand 的逻辑。
本篇文章介绍一下 Kubernetes 的默认调度器 kube-scheduler 的源码实现。kubernetes 代码版本:v1.18.4-rc.0。
0. 入口
入口函数在路径 kubernetes/cmd/kube-scheduler/scheduler.go#main()
,如下
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewSchedulerCommand()
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
核心逻辑就是:1. 创建一个 SchedulerCommand(第 4 行);2. 接收参数并执行(第 14 行)。我们先看一下创建 SchedulerCommand 的逻辑。
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary. See [scheduling](https://kubernetes.io/docs/concepts/scheduling/)
for more information about scheduling and the kube-scheduler component.`,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
fs := cmd.Flags()
...
return cmd
}
首先我们可以看到 NewSchedulerCommand 接收一个不定参数,registryOptions。从名字我们可以看出来首先这个参数是作用于一个 Registry 的,这个 Registry 实际上就是用来管理 kuberentes 中的 plugin 的。
// Registry is a collection of all available plugins. The framework uses a // registry to enable and initialize configured plugins. // All plugins must be in the registry before initializing the framework. type Registry map[string]PluginFactory
而 registryOptions 中的 option 其实是一种函数传参的方式的使用。option 传参的方式最早由 Rob Pike 提出来的,简单来说就是将可选的 option 参数封装成多个函数传给目标函数,然后在目标函数内部通过调用 option 函数的方式来初始化。后面我们看到 RegistryOptions 初始化的部分再来介绍。对于 option 这种方式感兴趣的同学可以参考我之前的一篇文章: http://legendtkl.com/2016/11/05/code-scalability/
其次是 cmd,通过 cobra.Command 构建出来的一个 CLI 处理工具,对于命令行的输入通过第 18 行的匿名函数来处理,匿名函数内部会调用函数 runCommand 来启动 scheduler 进程。去掉一些不重要的代码逻辑,runCommand 主要做的事情就是创建 scheduler 参数,然后通过 Run 函数启动 scheduler 进程。
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
...
// 创建 scheduler 参数
c, err := opts.Config()
if err != nil {
return err
}
// Get the completed config
// 参数补充
cc := c.Complete()
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return Run(ctx, cc, registryOptions...)
}
Run 函数的主要逻辑如下:
- 初始化 Registry,第 6 ~ 11 行就是 option 这种函数传参的处理逻辑。
- 创建 scheduler 实例
- 其他初始化操作,包括 EventBroadcast、健康检测、metric 等相关逻辑
- 启动 Pod Informer 来监听 Pod
- 运行调度器(分没有启动 leader 选举,但是对应的方法都是 sched.Run 方法)
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
outOfTreeRegistry := make(framework.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return err
}
}
recorderFactory := getRecorderFactory(&cc)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return err
}
// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(ctx.Done())
}
if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
}
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
// Start up the healthz server.
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// Start all informers.
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
scheduler 实例
首先我们看一下 scheduler 的定义,路径为 pkg/scheduler/scheduler.go
。
// Scheduler 监听未调度的 Pod,为其寻找适合的 Node 节点,并写回到 api server
type Scheduler struct {
// 调度器 Cache
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
podConditionUpdater podConditionUpdater
// 在抢占情况下用来驱逐 pod,更新抢占者的 'NominatedNode' 字段
podPreemptor podPreemptor
// 返回下一个需要调度的 Pod,如果没有需要调度的 pod,则该方法将 block 住。这里不使用 channel 数据结构是因为调度过程可能会花费一定时间,设计者并不想在这个时间内让 Pod 停留在 channel 中。注:虽然官方没有说,这里还有一个可能的原因是 channel 不能持久化数据。
NextPod func() *framework.PodInfo
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.PodInfo, error)
// 用一个空的 struct channel 来标识是否需要 stop。Golang 中的惯用用法。
StopEverything <-chan struct{}
// 处理 PVC/PV
VolumeBinder scheduling.SchedulerVolumeBinder
// 是否禁止 Pod 抢占
DisablePreemption bool
// 调度队列,需要调度的 Pod 都存在这个队列里面,内部实现是一个优先级队列
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
scheduledPodsHasSynced func() bool
}
运行调度器
下面看一下调度器
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}
在 scheduler 的 Run 函数中主要做了三件事情:
- 等待 scheduler cache 同步(scheduler 刚起来,相当于冷启动)
- 运行调度器队列的 Run 函数
- 运行 scheduler 的 scheduleOne 函数
调度队列
调度队列的 Run 函数第一次看到总是给你一点点疑惑,作为一个队列难道还需要启动吗?确实是这样,如果调度队列只是一个优先级队列,那么确实不需要启动。kubernetes 中的调度队列是由三个队列组成,分别是:
- activeQueue:待调度的 pod 队列,scheduler 会监听这个队列
- backoffQueue:在 kubernetes 中,如果调度失败了,就相当于一次 backoff。backoffQueue 专门用来存放 backoff 的 pod。一般会有一个 backoffLimit 的限制就是最多容忍多少次 backoff。其次每次 backoff 之间的时间成倍增长。
- unschedulableQueue:调度过程被终止的 pod 存放的队列。
调度队列的 Run 函数做的事情就是将 backoffQueue 和 unschedulableQueue 中 pod 定期移动到 activeQueue 中。
// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
其中 wait.Until
实际上就是一个类似 Cron 的定时调度器。细节实现暂时不细说了。
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}
我们再来看一下两个 flush 函数的逻辑。首先是 flushBackoffQCompleted()
,主要逻辑如下:
getBackoffTime calculateBackoffDuration()
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
for {
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
return
}
pod := rawPodInfo.(*framework.PodInfo).Pod
boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
if boTime.After(p.clock.Now()) {
return
}
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
return
}
p.activeQ.Add(rawPodInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
defer p.cond.Broadcast()
}
}
// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
duration := p.calculateBackoffDuration(podInfo)
backoffTime := podInfo.Timestamp.Add(duration)
return backoffTime
}
// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
duration := p.podInitialBackoffDuration
for i := 1; i < podInfo.Attempts; i++ {
duration = duration * 2
if duration > p.podMaxBackoffDuration {
return p.podMaxBackoffDuration
}
}
return duration
}
下面我们看一下 unschedulableQueue 中的 pod 是如何 flush 的,也就是函数 flushUnschedulableQLeftover
的实现逻辑。逻辑非常简单,如果 pod 在 unschedulableQueue 中停留时间超过了 60s,就会被移除到 activeQueue。
// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval
// to activeQ.
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*framework.PodInfo
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo)
}
}
if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
}
}
const (
// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
// the pod will be moved from unschedulableQ to activeQ.
unschedulableQTimeInterval = 60 * time.Second
queueClosed = "scheduling queue is closed"
)
以上所述就是小编给大家介绍的《源码面前,了无密码:Kuberentes Scheduler 源码剖析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【Java集合源码剖析】ArrayList源码剖析
- Java集合源码剖析:TreeMap源码剖析
- 我的源码阅读之路:redux源码剖析
- ThreadLocal源码深度剖析
- SharedPreferences源码剖析
- Volley源码剖析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Scratch少儿趣味编程
[ 日] 阿部和广 / 陶 旭 / 人民邮电出版社 / 2014-11 / 59.00元
Scratch 是麻省理工学院设计开发的一款编程工具,是适合少儿学习编程和交流的工具和平台,有中文版且完全免费。本书结合孩子们学习的语文、数学、科学、社会、音乐、体育等科目,手把手地教大家如何用Scratch 设计程序(如设计一个自动写作文的程序),配合各式卡通形象,通俗易懂,寓教于乐。麻省理工学院教授米切尔•瑞斯尼克作序推荐。 本书图文并茂,生动风趣,适合中小学生等初学者自学或在家长的帮助......一起来看看 《Scratch少儿趣味编程》 这本书的介绍吧!