Kubernetes 调度器是集群的核心组件,负责将 Pod 调度到合适的节点上。调度过程分为两个主要阶段:Scheduling Cycle(调度周期)和 Binding Cycle(绑定周期)。
本文将从 Kubernetes 源码角度,详细分析原生调度器如何处理每一个阶段。
调度器架构概览
Kubernetes 调度器的核心代码位于 pkg/scheduler/ 目录,主要组件包括:
- Scheduler:调度器主结构体,协调整个调度过程
- Schedule:调度循环,处理 Pod 的调度
- Framework:调度框架,定义扩展点和插件接口
- Plugins:各种内置插件,实现具体的调度和绑定逻辑
graph TD
A[API Server] -->|Watch Pod| B[Scheduler]
B --> C[Scheduling Queue]
C --> D[Schedule Loop]
D --> E[Scheduling Cycle]
E --> F[Binding Cycle]
F --> G[API Server]
G -->|Pod.Spec.NodeName| H[kubelet]
H --> I[创建容器]
Scheduling Cycle(调度周期)
Scheduling Cycle 的目标是为 Pod 选择一个最合适的节点。这个阶段是纯计算过程,不会修改 API Server 中的 Pod 对象。
1. PreEnqueue(预入队阶段)
源码位置:pkg/scheduler/schedule_one.go
PreEnqueue 是调度器处理 Pod 的第一个阶段,在 Pod 进入调度队列之前执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// pkg/scheduler/schedule_one.go
func (sched *Scheduler) enqueuePod(pod *v1.Pod, oldPod *v1.Pod, reason string) error {
// 检查 Pod 是否应该被调度
if !sched.podShouldBeScheduled(pod) {
return nil
}
// 运行 PreEnqueue 插件
status := sched.framework.RunPreEnqueuePlugins(pod)
if !status.IsSuccess() {
return status.Error()
}
// 将 Pod 添加到调度队列
return sched.ScheduleQueue.Add(pod)
}
|
关键点:
- 检查 Pod 是否已经调度(
pod.Spec.NodeName 是否为空)
- 检查 Pod 是否处于终端状态(Succeeded/Failed)
- 运行 PreEnqueue 插件,允许插件决定是否允许 Pod 入队
- 将 Pod 添加到调度队列
2. Sort(排序阶段)
源码位置:pkg/scheduler/schedule_one.go
调度队列会根据 Pod 的优先级和创建时间进行排序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleNextPod() {
// 从队列中获取下一个要调度的 Pod
podInfo := sched.ScheduleQueue.Pop()
if podInfo == nil {
return
}
pod := podInfo.Pod
// 检查 Pod 是否仍然有效
if !sched.podExistsAndIsUnscheduled(pod) {
return
}
// 开始调度循环
sched.schedulePod(podInfo)
}
|
关键点:
- 高优先级的 Pod 优先调度
- 相同优先级的 Pod 按创建时间排序
- 支持抢占式调度
3. PreFilter(预过滤阶段)
源码位置:pkg/scheduler/framework/processing.go
PreFilter 阶段在调度循环开始时执行,用于预处理和检查:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status, sets.Set[string]) {
var result *PreFilterResult
var status *Status
var rejectedBySets sets.Set[string]
for _, pl := range f.preFilterPlugins {
result, status = pl.PreFilter(ctx, state, pod)
if !status.IsSuccess() {
// 如果 PreFilter 失败,返回 UnschedulableAndUnresolvable
return nil, status, nil
}
}
return result, status, rejectedBySets
}
|
关键点:
- 所有 PreFilter 插件都会执行
- 如果任何插件返回失败,整个调度失败
- 返回
UnschedulableAndUnresolvable 时,Pod 不会重新入队
内置 PreFilter 插件示例:
- NodePorts:检查节点的端口冲突
- CSILimits:检查 CSI 驱动限制
- VolumeBinding:检查 PVC 是否已绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// pkg/scheduler/framework/plugins/nodeports/node_ports.go
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *CycleState, pod *v1.Pod) (*PreFilterResult, *Status) {
// 获取 Pod 的主机端口
hostPorts := getHostPorts(pod)
if len(hostPorts) == 0 {
return nil, NewStatus(Success)
}
// 将端口信息存储到 CycleState 中,供 Filter 阶段使用
cycleState.Write(stateKey, &preFilterState{
hostPorts: hostPorts,
})
return nil, NewStatus(Success)
}
|
4. Filter(过滤阶段)
源码位置:pkg/scheduler/framework/processing.go
Filter 阶段过滤掉不满足条件的节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) (NodeToStatus, error) {
// 为每个节点运行 Filter 插件
nodeToStatus := make(NodeToStatus)
for _, node := range nodes {
status := f.RunFilterPluginsForNode(ctx, state, pod, node)
if !status.IsSuccess() {
nodeToStatus.Set(node.Name(), status)
}
}
return nodeToStatus, nil
}
func (f *Framework) RunFilterPluginsForNode(ctx context.Context, state *CycleState, pod *v1.Pod, node *NodeInfo) *Status {
// 并行运行所有 Filter 插件
var errs []error
var statuses []*Status
for _, pl := range f.filterPlugins {
status := pl.Filter(ctx, state, pod, node)
statuses = append(statuses, status)
}
// 合并所有状态
return MergeStatuses(statuses)
}
|
关键点:
- 为每个节点并行运行所有 Filter 插件
- 如果任何插件返回
Unschedulable,该节点被过滤
- 如果任何插件返回
UnschedulableAndUnresolvable,该节点被永久过滤
内置 Filter 插件示例:
- NodeUnschedulable:过滤
spec.unschedulable=true 的节点
- NodeName:检查
spec.nodeName 是否匹配
- TaintToleration:检查污点和容忍度
- NodeAffinity:检查节点亲和性
- VolumeRestrictions:检查卷限制
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go
func (pl *NodeUnschedulable) Filter(ctx context.Context, _ *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status {
node := nodeInfo.Node()
// 如果节点标记为不可调度,且 Pod 不容忍该污点,则过滤
if node.Spec.Unschedulable {
if !tolerationsTolerateTaint(pod.Spec.Tolerations, &unschedulableTaint) {
return NewStatus(Unschedulable, "node(s) were unschedulable")
}
}
return NewStatus(Success)
}
|
5. PostFilter(后过滤阶段)
源码位置:pkg/scheduler/framework/processing.go
PostFilter 在所有节点都被过滤掉时执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, m NodeToStatusReader) (*PostFilterResult, *Status) {
// 按顺序运行 PostFilter 插件
for _, pl := range f.postFilterPlugins {
result, status := pl.PostFilter(ctx, state, pod, m)
if status.Code() == Unschedulable {
// 如果插件返回 Unschedulable,继续运行其他插件
continue
}
if !status.IsSuccess() {
return result, status
}
// 如果插件成功,返回结果
return result, status
}
return nil, NewStatus(Unschedulable, "no nodes available")
}
|
关键点:
- 只有当所有节点都被过滤时才执行
- 插件可以尝试抢占低优先级 Pod
- 返回
Unschedulable 时,Pod 会重新入队
内置 PostFilter 插件:
- DefaultPreemption:尝试抢占低优先级 Pod
1
2
3
4
5
6
7
8
9
|
// pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, m NodeToStatusReader) (*PostFilterResult, *Status) {
defer func() {
metrics.PreemptionAttempts.Inc()
}()
result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
return result, status
}
|
6. PreScore(预打分阶段)
源码位置:pkg/scheduler/framework/processing.go
PreScore 在打分前执行,用于准备打分所需的数据:
1
2
3
4
5
6
7
8
9
10
11
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPreScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) *Status {
for _, pl := range f.preScorePlugins {
status := pl.PreScore(ctx, state, pod, nodes)
if !status.IsSuccess() {
return status
}
}
return NewStatus(Success)
}
|
7. Score(打分阶段)
源码位置:pkg/scheduler/framework/processing.go
Score 阶段为每个节点打分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) (map[string][]int64, error) {
scores := make(map[string][]int64)
for _, pl := range f.scorePlugins {
pluginScores, status := pl.Score(ctx, state, pod, nodes)
if !status.IsSuccess() {
return nil, status.Error()
}
// 合并分数
for nodeName, score := range pluginScores {
scores[nodeName] = append(scores[nodeName], score)
}
}
return scores, nil
}
|
内置 Score 插件示例:
- NodeResourcesBalancedAllocation:资源平衡打分
- ImageLocality:镜像本地性打分
- InterPodAffinity:Pod 亲和性打分
- NodeAffinity:节点亲和性打分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go
func (pl *BalancedAllocation) Score(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) (map[string]int64, error) {
scores := make(map[string]int64)
for _, node := range nodes {
// 计算节点的 CPU 和内存使用率
cpuFraction := fractionOfCapacity(node, corev1.ResourceCPU, pod)
memoryFraction := fractionOfCapacity(node, corev1.ResourceMemory, pod)
// 计算平衡分数(越平衡分数越高)
score := calculateBalanceScore(cpuFraction, memoryFraction)
scores[node.Name()] = score
}
return scores, nil
}
|
8. NormalizeScore(标准化分数阶段)
源码位置:pkg/scheduler/framework/processing.go
NormalizeScore 标准化所有插件的分数到 0-100 范围:
1
2
3
4
5
6
7
8
9
10
11
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunNormalizeScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo, scores map[string][]int64) error {
for _, pl := range f.normalizeScorePlugins {
status := pl.NormalizeScore(ctx, state, pod, scores)
if !status.IsSuccess() {
return status.Error()
}
}
return nil
}
|
9. Reserve(预留阶段)
源码位置:pkg/scheduler/framework/processing.go
Reserve 阶段预留资源(信息性,不影响调度结果):
1
2
3
4
5
6
7
8
9
10
11
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
for _, pl := range f.reservePlugins {
status := pl.Reserve(ctx, state, pod, nodeName)
if !status.IsSuccess() {
return status
}
}
return NewStatus(Success)
}
|
10. Permit(许可阶段)
源码位置:pkg/scheduler/framework/processing.go
Permit 阶段可以批准或拒绝绑定:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
for _, pl := range f.permitPlugins {
status, _ := pl.Permit(ctx, state, pod, nodeName)
if status.Code() == Wait {
// Pod 进入等待状态
return status
}
if !status.IsSuccess() {
return status
}
}
return NewStatus(Success)
}
|
Binding Cycle(绑定周期)
Binding Cycle 将 Pod 实际绑定到节点,会修改 API Server 中的 Pod 对象。
1. WaitOnPermit(等待许可阶段)
源码位置:pkg/scheduler/framework/processing.go
如果 Permit 阶段返回 Wait,Pod 会进入等待状态:
1
2
3
4
5
6
7
8
9
10
11
12
|
// pkg/scheduler/framework/processing.go
func (f *Framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status {
// 等待 Permit 插件批准
for {
status := f.checkPermitStatus(pod)
if status.Code() != Wait {
return status
}
// 等待一段时间后重试
time.Sleep(100 * time.Millisecond)
}
}
|
2. PreBind(预绑定阶段)
源码位置:pkg/scheduler/framework/processing.go
PreBind 阶段在绑定前执行准备工作:
1
2
3
4
5
6
7
8
9
10
11
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
for _, pl := range f.preBindPlugins {
status := pl.PreBind(ctx, state, pod, nodeName)
if !status.IsSuccess() {
return status
}
}
return NewStatus(Success)
}
|
内置 PreBind 插件:
- VolumeBinding:绑定 PVC 到 PV
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *CycleState, pod *v1.Pod, nodeName string) *Status {
s, err := getStateData(cs)
if err != nil {
return AsStatus(err)
}
if s.allBound {
// 所有卷都已绑定,无需操作
return nil
}
podVolumes, ok := s.podVolumesByNode[nodeName]
if !ok {
return AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
}
// 绑定 Pod 的卷
err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
if err != nil {
return AsStatus(err)
}
return nil
}
|
3. Bind(绑定阶段)
源码位置:pkg/scheduler/framework/processing.go
Bind 阶段实际绑定 Pod 到节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
for _, pl := range f.bindPlugins {
status := pl.Bind(ctx, state, pod, nodeName)
if status.Code() == Skip {
// 插件跳过,尝试下一个插件
continue
}
return status
}
// 所有插件都跳过,返回 Skip
return NewStatus(Skip)
}
|
内置 Bind 插件:
- DefaultBinder:默认绑定实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// pkg/scheduler/framework/plugins/defaultbinder/default_binder.go
func (b *DefaultBinder) Bind(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
// 创建 Pod 的副本
podCopy := pod.DeepCopy()
// 更新 Pod 的 nodeName 字段
podCopy.Spec.NodeName = nodeName
// 调用 API Server 更新 Pod
_, err := b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
if err != nil {
return NewStatus(Error, err.Error())
}
return NewStatus(Success)
}
|
4. PostBind(后绑定阶段)
源码位置:pkg/scheduler/framework/processing.go
PostBind 阶段在绑定成功后执行清理工作:
1
2
3
4
5
6
|
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
for _, pl := range f.postBindPlugins {
pl.PostBind(ctx, state, pod, nodeName)
}
}
|
关键点:
- PostBind 是异步的,不会阻塞调度
- 失败不影响调度结果
- 用于清理临时资源和记录日志
调度流程总结
sequenceDiagram
participant Q as Scheduling Queue
participant S as Scheduler
participant F as Framework
participant P as Plugins
participant A as API Server
Q->>S: Pod 入队
S->>F: 开始调度循环
Note over F: Scheduling Cycle
F->>P: PreEnqueue
F->>P: PreFilter
F->>P: Filter(并行)
alt 所有节点被过滤
F->>P: PostFilter
F->>Q: Pod 重新入队
end
F->>P: PreScore
F->>P: Score(并行)
F->>P: NormalizeScore
F->>P: Reserve
F->>P: Permit
Note over F: Binding Cycle
F->>P: WaitOnPermit
F->>P: PreBind
F->>A: Bind(更新 Pod.nodeName)
F->>P: PostBind
S->>Q: 调度完成
关键设计原则
- 并行处理:Filter 和 Score 阶段并行处理所有节点
- 短路求值:任何阶段失败都会立即终止调度
- 状态管理:CycleState 在调度过程中传递状态
- 插件化设计:每个阶段都可以插入自定义插件
- 异步绑定:PostBind 异步执行,不阻塞调度
内置插件特殊处理详解
Kubernetes 内置了一些重要的插件,它们在特定阶段有特殊的处理方式。以下是所有内置插件的完整列表和特殊处理说明。
1. VolumeBinding 插件:跨多个阶段的复杂处理
VolumeBinding 是一个跨 PreFilter、Filter、Reserve、PreBind 四个阶段的复杂插件,用于处理 PVC 和 PV 的绑定。
flowchart TD
A[PreFilter] --> B[检查 PVC 是否已绑定]
B --> C{有未绑定的 PVC?}
C -->|是| D[返回 UnschedulableAndUnresolvable]
C -->|否| E[Filter]
E --> F[为每个节点查找匹配的 PV]
F --> G[检查 PV 的节点亲和性]
G --> H[Reserve]
H --> I[AssumePodVolumes - 假设绑定]
I --> J[PreBind]
J --> K[BindPodVolumes - 实际绑定]
K --> L[等待 PV 控制器完成绑定]
PreFilter 阶段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) PreFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
// 检查 Pod 是否有 PVC
if hasPVC, err := pl.podHasPVCs(pod); err != nil {
return nil, fwk.NewStatus(fwk.UnschedulableAndUnresolvable, err.Error())
} else if !hasPVC {
// 没有 PVC,跳过
state.Write(stateKey, &stateData{})
return nil, fwk.NewStatus(fwk.Skip)
}
// 获取 Pod 的所有 PVC
podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(logger, pod)
if err != nil {
return nil, fwk.AsStatus(err)
}
// 关键:检查是否有未绑定的 immediate PVC
if len(podVolumeClaims.unboundClaimsImmediate) > 0 {
// 返回 UnschedulableAndUnresolvable,Pod 不会重新入队
status := fwk.NewStatus(fwk.UnschedulableAndUnresolvable)
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return nil, status
}
// 将 PVC 信息存储到 CycleState
state.Write(stateKey, &stateData{
podVolumesByNode: make(map[string]*PodVolumes),
podVolumeClaims: &PodVolumeClaims{
boundClaims: podVolumeClaims.boundClaims,
unboundClaimsDelayBinding: podVolumeClaims.unboundClaimsDelayBinding,
unboundVolumesDelayBinding: podVolumeClaims.unboundVolumesDelayBinding,
},
})
return nil, nil
}
|
Filter 阶段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) Filter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
// 为当前节点查找匹配的 PV
podVolumes, status := pl.findPodVolumes(ctx, state, pod, nodeInfo)
if !status.IsSuccess() {
return status
}
// 将查找结果缓存到 CycleState
s, _ := getStateData(state)
s.podVolumesByNode[nodeInfo.Node().Name] = podVolumes
return nil
}
|
Reserve 阶段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
state, err := getStateData(cs)
if err != nil {
return fwk.AsStatus(err)
}
podVolumes, ok := state.podVolumesByNode[nodeName]
if ok {
// AssumePodVolumes:假设绑定,但还未实际绑定
allBound, err := pl.Binder.AssumePodVolumes(klog.FromContext(ctx), pod, nodeName, podVolumes)
if err != nil {
return fwk.AsStatus(err)
}
state.allBound = allBound
}
return nil
}
|
PreBind 阶段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) PreBind(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
s, err := getStateData(cs)
if err != nil {
return fwk.AsStatus(err)
}
if s.allBound {
// 所有卷都已绑定,无需操作
return nil
}
podVolumes, ok := s.podVolumesByNode[nodeName]
if !ok {
return fwk.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
}
// BindPodVolumes:实际绑定 PVC 到 PV
err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
if err != nil {
return fwk.AsStatus(err)
}
return nil
}
|
关键点:
- PreFilter 检查 PVC 状态,如果未绑定则返回
UnschedulableAndUnresolvable
- Filter 为每个节点查找匹配的 PV,并缓存结果
- Reserve 阶段使用
AssumePodVolumes 假设绑定(乐观锁)
- PreBind 阶段才真正执行绑定操作
2. DefaultPreemption 插件:抢占机制
当所有节点都被过滤掉时,DefaultPreemption 插件会尝试抢占低优先级 Pod:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, m fwk.NodeToStatusReader) (*fwk.PostFilterResult, *fwk.Status) {
defer func() {
metrics.PreemptionAttempts.Inc() // 记录抢占尝试次数
}()
result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
msg := status.Message()
if len(msg) > 0 {
return result, fwk.NewStatus(status.Code(), "preemption: "+msg)
}
return result, status
}
|
抢占流程:
- 检查是否有节点可以通过抢占低优先级 Pod 来调度当前 Pod
- 选择最优的抢占目标(最少抢占 Pod 数量)
- 删除抢占目标 Pod
- 当前 Pod 重新入队,等待下一次调度
3. BalancedAllocation 插件:资源平衡打分
BalancedAllocation 插件在 PreScore 阶段计算 Pod 的资源请求,在 Score 阶段为节点打分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go
func (ba *BalancedAllocation) PreScore(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) *fwk.Status {
// 计算 Pod 的资源请求
podRequests := ba.calculatePodResourceRequestList(pod, ba.resources)
// 特殊处理:跳过 best-effort Pod(没有资源请求的 Pod)
if ba.isBestEffortPod(podRequests) {
return fwk.NewStatus(fwk.Skip)
}
// 将计算结果存储到 CycleState
state := &balancedAllocationPreScoreState{
podRequests: podRequests,
}
cycleState.Write(balancedAllocationPreScoreStateKey, state)
return nil
}
|
关键点:
- PreScore 阶段预先计算 Pod 的资源请求,避免在每个节点的 Score 阶段重复计算
- Best-effort Pod(没有资源请求)会跳过打分,防止大量 Pod 都调度到同一个节点
4. NodeUnschedulable 插件:节点不可调度检查
NodeUnschedulable 插件检查节点是否标记为不可调度,以及 Pod 是否容忍该污点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go
func (pl *NodeUnschedulable) Filter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
node := nodeInfo.Node()
if !node.Spec.Unschedulable {
return nil // 节点可调度
}
// 检查 Pod 是否容忍 unschedulable 污点
podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(
klog.FromContext(ctx),
pod.Spec.Tolerations,
&v1.Taint{
Key: v1.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
},
utilfeature.DefaultFeatureGate.Enabled(features.TaintTolerationComparisonOperators),
)
if !podToleratesUnschedulable {
// 返回 UnschedulableAndUnresolvable,节点被永久过滤
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "node(s) were unschedulable")
}
return nil
}
|
关键点:
- 如果节点标记为
unschedulable=true,且 Pod 不容忍该污点,返回 UnschedulableAndUnresolvable
UnschedulableAndUnresolvable 表示节点被永久过滤,不会重新尝试
5. DefaultBinder 插件:Pod 绑定
DefaultBinder 是默认的绑定插件,负责将 Pod 绑定到节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// pkg/scheduler/framework/plugins/defaultbinder/default_binder.go
func (b DefaultBinder) Bind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
logger := klog.FromContext(ctx)
// 创建 Binding 对象
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{
Namespace: p.Namespace,
Name: p.Name,
UID: p.UID,
},
Target: v1.ObjectReference{
Kind: "Node",
Name: nodeName,
},
}
if b.handle.APICacher() != nil {
// 当 API Cacher 可用时,使用 Cacher 进行绑定(性能优化)
onFinish, err := b.handle.APICacher().BindPod(binding)
if err != nil {
return fwk.AsStatus(err)
}
// 等待绑定完成
err = b.handle.APICacher().WaitOnFinish(ctx, onFinish)
if err != nil {
return fwk.AsStatus(err)
}
return nil
}
// 直接调用 API Server 进行绑定
logger.V(3).Info("Attempting to bind pod to node", "pod", klog.KObj(p), "node", klog.KRef("", nodeName))
err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err != nil {
return fwk.AsStatus(err)
}
return nil
}
|
关键点:
- 使用
Binding 对象进行绑定,而不是直接更新 Pod
- 当 API Cacher 可用时,使用 Cacher 进行绑定,提高性能
- 绑定成功后,kubelet 会监听到 Pod 并创建容器
6. Filter 插件完整列表
TaintToleration 插件:污点容忍度检查
检查 Pod 是否容忍节点的污点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go
func (pl *TaintToleration) Filter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
node := nodeInfo.Node()
// 查找节点上 Pod 不容忍的污点
taint, isUntolerated := v1helper.FindMatchingUntoleratedTaint(
klog.FromContext(ctx),
node.Spec.Taints,
pod.Spec.Tolerations,
helper.DoNotScheduleTaintsFilterFunc(),
pl.enableTaintTolerationComparisonOperators,
)
if isUntolerated {
// 返回 UnschedulableAndUnresolvable
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("node(s) had taint {%s}, that the pod didn't tolerate", taint.ToString()))
}
return nil
}
|
NodeAffinity 插件:节点亲和性检查
检查 Pod 的节点亲和性要求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
affinity := pod.Spec.Affinity
noNodeAffinity := (affinity == nil ||
affinity.NodeAffinity == nil ||
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil)
if noNodeAffinity && pl.addedNodeSelector == nil && pod.Spec.NodeSelector == nil {
// 没有节点亲和性要求,跳过
return nil, fwk.NewStatus(fwk.Skip)
}
// 提取必需的节点选择器
state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
cycleState.Write(stateKey, state)
return nil, nil
}
func (pl *NodeAffinity) Filter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
// 检查节点是否满足 Pod 的节点亲和性要求
// ...
}
|
NodeName 插件:指定节点检查
检查 Pod 是否指定了节点名称:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// pkg/scheduler/framework/plugins/nodename/node_name.go
func (pl *NodeName) Filter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
if !Fits(pod, nodeInfo) {
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, ErrReason)
}
return nil
}
// Fits 检查 Pod 是否适合节点
func Fits(pod *v1.Pod, nodeInfo fwk.NodeInfo) bool {
// 如果 Pod 没有指定 NodeName,或者 NodeName 匹配当前节点,则适合
return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name
}
|
NodePorts 插件:节点端口检查
检查节点端口是否冲突:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// pkg/scheduler/framework/plugins/nodeports/node_ports.go
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
// 获取 Pod 的主机端口
hostPorts := getHostPorts(pod)
if len(hostPorts) == 0 {
return nil, fwk.NewStatus(fwk.Skip)
}
// 将端口信息存储到 CycleState
cycleState.Write(stateKey, &preFilterState{
hostPorts: hostPorts,
})
return nil, nil
}
func (pl *NodePorts) Filter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
// 检查节点端口是否冲突
// ...
}
|
CSILimits 插件:CSI 驱动限制检查
检查节点上的 CSI 驱动是否达到限制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// pkg/scheduler/framework/plugins/nodevolumelimits/csi.go
func (pl *CSILimits) PreFilter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, _ []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
volumes := pod.Spec.Volumes
for i := range volumes {
vol := &volumes[i]
if vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil || pl.translator.IsInlineMigratable(vol) {
return nil, nil
}
}
// 没有卷,跳过
return nil, fwk.NewStatus(fwk.Skip)
}
func (pl *CSILimits) Filter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
// 检查 CSI 驱动限制
// ...
}
|
VolumeZone 插件:卷可用区检查
检查卷的可用区是否与节点匹配:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// pkg/scheduler/framework/plugins/volumezone/volume_zone.go
func (pl *VolumeZone) PreFilter(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
// 获取 Pod 的 PV 拓扑信息
podPVTopologies, status := pl.getPVbyPod(logger, pod)
if !status.IsSuccess() {
return nil, status
}
if len(podPVTopologies) == 0 {
return nil, fwk.NewStatus(fwk.Skip)
}
cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies})
return nil, nil
}
func (pl *VolumeZone) Filter(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
// 检查卷的可用区是否与节点匹配
// ...
}
|
VolumeRestrictions 插件:卷限制检查
检查卷的使用限制(如 ReadWriteOnce):
1
2
3
4
5
|
// pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
// 检查卷限制
// ...
}
|
PodTopologySpread 插件:Pod 拓扑分布检查
检查 Pod 的拓扑分布约束:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
// 计算 Pod 的拓扑分布约束
s, err := pl.calPreFilterState(ctx, pod, nodes)
if err != nil {
return nil, fwk.AsStatus(err)
}
if s != nil && len(s.Constraints) == 0 {
return nil, fwk.NewStatus(fwk.Skip)
}
cycleState.Write(stateKey, s)
return nil, nil
}
func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
// 检查拓扑分布约束
// ...
}
|
SchedulingGates 插件:调度门检查
检查 Pod 是否有未解除的调度门:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go
func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *fwk.Status {
if len(p.Spec.SchedulingGates) == 0 {
return nil
}
gates := make([]string, 0, len(p.Spec.SchedulingGates))
for _, gate := range p.Spec.SchedulingGates {
gates = append(gates, gate.Name)
}
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates))
}
|
7. Score 插件完整列表
ImageLocality 插件:镜像本地性打分
优先选择已有容器镜像的节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// pkg/scheduler/framework/plugins/imagelocality/image_locality.go
func (pl *ImageLocality) Score(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) (int64, *fwk.Status) {
nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return 0, fwk.AsStatus(err)
}
totalNumNodes := len(nodeInfos)
// 计算镜像分数
imageScores := sumImageScores(nodeInfo, pod, totalNumNodes)
score := calculatePriority(imageScores, len(pod.Spec.InitContainers)+len(pod.Spec.Containers))
return score, nil
}
|
InterPodAffinity 插件:Pod 亲和性打分
根据 Pod 亲和性要求为节点打分:
1
2
3
4
5
6
7
8
9
10
|
// pkg/scheduler/framework/plugins/interpodaffinity/scoring.go
func (pl *InterPodAffinity) PreScore(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) *fwk.Status {
// 计算 Pod 亲和性相关的状态
// ...
}
func (pl *InterPodAffinity) Score(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) (int64, *fwk.Status) {
// 根据 Pod 亲和性为节点打分
// ...
}
|
8. 其他重要插件
PrioritySort 插件:队列排序
在 Sort 阶段对 Pod 进行排序:
1
2
3
4
5
|
// pkg/scheduler/framework/plugins/queuesort/priority_sort.go
func (pl *PrioritySort) Less(pod1, pod2 *v1.Pod) bool {
// 高优先级的 Pod 排在前面
return GetPodPriority(pod1) > GetPodPriority(pod2)
}
|
GangScheduling 插件:成组调度
实现 Pod 组的协同调度:
1
2
3
4
5
6
7
8
9
10
11
|
// pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go
func (pl *GangScheduling) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
// 假设 Pod 属于一个组
podGroupInfo.AssumePod(pod.UID)
return nil
}
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
// 等待组内所有 Pod 都准备好
// ...
}
|
DynamicResources 插件:动态资源处理
处理动态资源(DRA)的调度:
1
2
3
4
5
|
// pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go
func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
// 处理动态资源预留
// ...
}
|
参考资料