Scheduler 的工作就是决定让一个 pod 在哪个 node 上运行。 scheduler 从 API Server 获得 pod 和 node 的信息,然后把它的决策信息写会 API Server, 它自己不参与具体的调度,而是运行在每个 node 上的 kubelet 主动获取更新,然后启动 pod。

scheduler 的入口函数在 cmd/kube-schduler/server.go,但实际工作都是在 pkg/scheduler/scheduler.go 里面的 Run 函数开始的。

打开 scheduler.go 文件找到结构体 Scheduler ,会发现它有很多私有函数,但只有唯一一个公开的 Run() 函数。

先从 Scheduler 结构体来说一下调度器的整体思路,其中最重要的三个成员如下:

type Scheduler struct {
	Algorithm core.ScheduleAlgorithm

	NextPod func() *framework.QueuedPodInfo

	SchedulingQueue internalqueue.SchedulingQueue
}
  • Algorithm 就是具体调度的算法
  • SchedulingQueue 是等待调度的队列,它本身是一个接口,它的实现是 PriorityQueue ,位于 pkg/scheduler/internal/queue/scheduling_queue.go
  • NextPod 获取等待调度的 pod

另外顺便提一下,kubernetes 中的调度队列是由三个队列组成,分别是:

  1. activeQueue:待调度的 pod 队列,scheduler 会监听这个队列
  2. backoffQueue:在 kubernetes 中,如果调度失败了,就相当于一次 backoff。 backoffQueue 专门用来存放 backoff 的 pod。
  3. unschedulableQueue:调度过程被终止的 pod 存放的队列。

然后来看 Scheduler 的 Run 函数:

func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

// SchedulingQueue 
func (p *PriorityQueue) Run() {
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

第一行的 SchedulingQueue.Run() 只会在 scheduler 启动时调用,实际是运行 PriorityQueue 的 Run 函数,可以看到,该函数是开启了两个 goroutine 不断地监视 backoff 队列和 unschedulable 队列。

第二行的 wait.UntilWithContext 也是一个无限循环函数,scheduler 会一直执行这个循环直到退出,其中 sched.scheduleOne() 是调度器的核心部分。

scheduleOne() 函数

func (sched *Scheduler) scheduleOne(ctx context.Context) {
	podInfo := sched.NextPod()
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
}

不难找到,最后调用的是 pkg/scheduler/core/generic_scheduler.go 中的 genericScheduler 结构体的 Schedule 函数。

一个小插曲: 在看源码的过程中发现了一个很有意思的 “蓄水池采样” 的实际应用。

Reservoir Sampling

我们先看看蓄水池采样方法的定义:

假设需要采样的数量为 k 。

首先构建一个可容纳 k 个元素的数组,将序列的前 k 个元素放入数组中。

然后对于第 j ( j > k ) 个元素开始,以 k/j​ 的概率来决定该元素是否被替换到数组中(数组中的 k 个元素被替换的概率是相同的)。 

当遍历完所有元素之后,数组中剩下的元素即为所需采取的样本。

genericScheduler.Schedule 中的 selectHost() 函数相当于是当采样数量 k = 1 时,从 nodeScoreList 选择一个 node

func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) { 
	maxScore := nodeScoreList[0].Score
	selected := nodeScoreList[0].Name
	cntOfMaxScore := 1
	for _, ns := range nodeScoreList[1:] {
		if ns.Score > maxScore {
			maxScore = ns.Score
			selected = ns.Name
			cntOfMaxScore = 1
		} else if ns.Score == maxScore {
			cntOfMaxScore++
			if rand.Intn(cntOfMaxScore) == 0 {
				// Replace the candidate with probability of 1/cntOfMaxScore
				selected = ns.Name
			}
		}
	}
	return selected, nil
}

genericScheduler 的预选和优选

第一节中讲了 scheduleOne() 函数中调用的 genericScheduler.Schedule() 是整个调度的核心部分,通读一下 Schedule() 函数,发现其中三个函数至关重要:

func (g *genericScheduler) Schedule() {
	g.findNodesThatFitPod(ctx, fwk, state, pod)

	priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)

	host, err := g.selectHost(priorityList)

	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}

其中 findNodesThatFitPod() 就是所谓的预选,即选出所有符合条件的 node,之后的 prioritizeNodes() 就是所谓的优选,即选出最合适的一个 node。

关于具体的选择过程,由于牵涉的东西太多,暂时不陷入太多的细节中,随着学习的深入再在新的文章中介绍。

参考资料