ᕕ( ᐛ )ᕗ Jimyag's Blog

Kubernetes 原生调度器:Scheduling Cycle 和 Binding Cycle 详解

· 3492 字 · 约 17 分钟

Kubernetes 调度器是集群的核心组件,负责将 Pod 调度到合适的节点上。调度过程分为两个主要阶段:Scheduling Cycle(调度周期)和 Binding Cycle(绑定周期)。

本文将从 Kubernetes 源码角度,详细分析原生调度器如何处理每一个阶段。

调度器架构概览

Kubernetes 调度器的核心代码位于 pkg/scheduler/ 目录,主要组件包括:

  • Scheduler:调度器主结构体,协调整个调度过程
  • Schedule:调度循环,处理 Pod 的调度
  • Framework:调度框架,定义扩展点和插件接口
  • Plugins:各种内置插件,实现具体的调度和绑定逻辑
  graph TD
    A[API Server] -->|Watch Pod| B[Scheduler]
    B --> C[Scheduling Queue]
    C --> D[Schedule Loop]
    D --> E[Scheduling Cycle]
    E --> F[Binding Cycle]
    F --> G[API Server]
    G -->|Pod.Spec.NodeName| H[kubelet]
    H --> I[创建容器]

Scheduling Cycle(调度周期)

Scheduling Cycle 的目标是为 Pod 选择一个最合适的节点。这个阶段是纯计算过程,不会修改 API Server 中的 Pod 对象。

1. PreEnqueue(预入队阶段)

源码位置pkg/scheduler/schedule_one.go

PreEnqueue 是调度器处理 Pod 的第一个阶段,在 Pod 进入调度队列之前执行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// pkg/scheduler/schedule_one.go
func (sched *Scheduler) enqueuePod(pod *v1.Pod, oldPod *v1.Pod, reason string) error {
    // 检查 Pod 是否应该被调度
    if !sched.podShouldBeScheduled(pod) {
        return nil
    }

    // 运行 PreEnqueue 插件
    status := sched.framework.RunPreEnqueuePlugins(pod)
    if !status.IsSuccess() {
        return status.Error()
    }

    // 将 Pod 添加到调度队列
    return sched.ScheduleQueue.Add(pod)
}

关键点

  • 检查 Pod 是否已经调度(pod.Spec.NodeName 是否为空)
  • 检查 Pod 是否处于终端状态(Succeeded/Failed)
  • 运行 PreEnqueue 插件,允许插件决定是否允许 Pod 入队
  • 将 Pod 添加到调度队列

2. Sort(排序阶段)

源码位置pkg/scheduler/schedule_one.go

调度队列会根据 Pod 的优先级和创建时间进行排序:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleNextPod() {
    // 从队列中获取下一个要调度的 Pod
    podInfo := sched.ScheduleQueue.Pop()
    if podInfo == nil {
        return
    }

    pod := podInfo.Pod
    // 检查 Pod 是否仍然有效
    if !sched.podExistsAndIsUnscheduled(pod) {
        return
    }

    // 开始调度循环
    sched.schedulePod(podInfo)
}

关键点

  • 高优先级的 Pod 优先调度
  • 相同优先级的 Pod 按创建时间排序
  • 支持抢占式调度

3. PreFilter(预过滤阶段)

源码位置pkg/scheduler/framework/processing.go

PreFilter 阶段在调度循环开始时执行,用于预处理和检查:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status, sets.Set[string]) {
    var result *PreFilterResult
    var status *Status
    var rejectedBySets sets.Set[string]

    for _, pl := range f.preFilterPlugins {
        result, status = pl.PreFilter(ctx, state, pod)
        if !status.IsSuccess() {
            // 如果 PreFilter 失败,返回 UnschedulableAndUnresolvable
            return nil, status, nil
        }
    }

    return result, status, rejectedBySets
}

关键点

  • 所有 PreFilter 插件都会执行
  • 如果任何插件返回失败,整个调度失败
  • 返回 UnschedulableAndUnresolvable 时,Pod 不会重新入队

内置 PreFilter 插件示例

  1. NodePorts:检查节点的端口冲突
  2. CSILimits:检查 CSI 驱动限制
  3. VolumeBinding:检查 PVC 是否已绑定
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// pkg/scheduler/framework/plugins/nodeports/node_ports.go
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *CycleState, pod *v1.Pod) (*PreFilterResult, *Status) {
    // 获取 Pod 的主机端口
    hostPorts := getHostPorts(pod)
    if len(hostPorts) == 0 {
        return nil, NewStatus(Success)
    }

    // 将端口信息存储到 CycleState 中,供 Filter 阶段使用
    cycleState.Write(stateKey, &preFilterState{
        hostPorts: hostPorts,
    })

    return nil, NewStatus(Success)
}

4. Filter(过滤阶段)

源码位置pkg/scheduler/framework/processing.go

Filter 阶段过滤掉不满足条件的节点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// pkg/scheduler/framework/processing.go
func (f *Framework) RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) (NodeToStatus, error) {
    // 为每个节点运行 Filter 插件
    nodeToStatus := make(NodeToStatus)
    for _, node := range nodes {
        status := f.RunFilterPluginsForNode(ctx, state, pod, node)
        if !status.IsSuccess() {
            nodeToStatus.Set(node.Name(), status)
        }
    }

    return nodeToStatus, nil
}

func (f *Framework) RunFilterPluginsForNode(ctx context.Context, state *CycleState, pod *v1.Pod, node *NodeInfo) *Status {
    // 并行运行所有 Filter 插件
    var errs []error
    var statuses []*Status

    for _, pl := range f.filterPlugins {
        status := pl.Filter(ctx, state, pod, node)
        statuses = append(statuses, status)
    }

    // 合并所有状态
    return MergeStatuses(statuses)
}

关键点

  • 为每个节点并行运行所有 Filter 插件
  • 如果任何插件返回 Unschedulable,该节点被过滤
  • 如果任何插件返回 UnschedulableAndUnresolvable,该节点被永久过滤

内置 Filter 插件示例

  1. NodeUnschedulable:过滤 spec.unschedulable=true 的节点
  2. NodeName:检查 spec.nodeName 是否匹配
  3. TaintToleration:检查污点和容忍度
  4. NodeAffinity:检查节点亲和性
  5. VolumeRestrictions:检查卷限制
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go
func (pl *NodeUnschedulable) Filter(ctx context.Context, _ *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status {
    node := nodeInfo.Node()

    // 如果节点标记为不可调度,且 Pod 不容忍该污点,则过滤
    if node.Spec.Unschedulable {
        if !tolerationsTolerateTaint(pod.Spec.Tolerations, &unschedulableTaint) {
            return NewStatus(Unschedulable, "node(s) were unschedulable")
        }
    }

    return NewStatus(Success)
}

5. PostFilter(后过滤阶段)

源码位置pkg/scheduler/framework/processing.go

PostFilter 在所有节点都被过滤掉时执行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, m NodeToStatusReader) (*PostFilterResult, *Status) {
    // 按顺序运行 PostFilter 插件
    for _, pl := range f.postFilterPlugins {
        result, status := pl.PostFilter(ctx, state, pod, m)
        if status.Code() == Unschedulable {
            // 如果插件返回 Unschedulable,继续运行其他插件
            continue
        }
        if !status.IsSuccess() {
            return result, status
        }
        // 如果插件成功,返回结果
        return result, status
    }

    return nil, NewStatus(Unschedulable, "no nodes available")
}

关键点

  • 只有当所有节点都被过滤时才执行
  • 插件可以尝试抢占低优先级 Pod
  • 返回 Unschedulable 时,Pod 会重新入队

内置 PostFilter 插件

  1. DefaultPreemption:尝试抢占低优先级 Pod
1
2
3
4
5
6
7
8
9
// pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, m NodeToStatusReader) (*PostFilterResult, *Status) {
    defer func() {
        metrics.PreemptionAttempts.Inc()
    }()

    result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
    return result, status
}

6. PreScore(预打分阶段)

源码位置pkg/scheduler/framework/processing.go

PreScore 在打分前执行,用于准备打分所需的数据:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPreScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) *Status {
    for _, pl := range f.preScorePlugins {
        status := pl.PreScore(ctx, state, pod, nodes)
        if !status.IsSuccess() {
            return status
        }
    }

    return NewStatus(Success)
}

7. Score(打分阶段)

源码位置pkg/scheduler/framework/processing.go

Score 阶段为每个节点打分:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// pkg/scheduler/framework/processing.go
func (f *Framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) (map[string][]int64, error) {
    scores := make(map[string][]int64)

    for _, pl := range f.scorePlugins {
        pluginScores, status := pl.Score(ctx, state, pod, nodes)
        if !status.IsSuccess() {
            return nil, status.Error()
        }

        // 合并分数
        for nodeName, score := range pluginScores {
            scores[nodeName] = append(scores[nodeName], score)
        }
    }

    return scores, nil
}

内置 Score 插件示例

  1. NodeResourcesBalancedAllocation:资源平衡打分
  2. ImageLocality:镜像本地性打分
  3. InterPodAffinity:Pod 亲和性打分
  4. NodeAffinity:节点亲和性打分
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go
func (pl *BalancedAllocation) Score(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo) (map[string]int64, error) {
    scores := make(map[string]int64)

    for _, node := range nodes {
        // 计算节点的 CPU 和内存使用率
        cpuFraction := fractionOfCapacity(node, corev1.ResourceCPU, pod)
        memoryFraction := fractionOfCapacity(node, corev1.ResourceMemory, pod)

        // 计算平衡分数(越平衡分数越高)
        score := calculateBalanceScore(cpuFraction, memoryFraction)
        scores[node.Name()] = score
    }

    return scores, nil
}

8. NormalizeScore(标准化分数阶段)

源码位置pkg/scheduler/framework/processing.go

NormalizeScore 标准化所有插件的分数到 0-100 范围:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// pkg/scheduler/framework/processing.go
func (f *Framework) RunNormalizeScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*NodeInfo, scores map[string][]int64) error {
    for _, pl := range f.normalizeScorePlugins {
        status := pl.NormalizeScore(ctx, state, pod, scores)
        if !status.IsSuccess() {
            return status.Error()
        }
    }

    return nil
}

9. Reserve(预留阶段)

源码位置pkg/scheduler/framework/processing.go

Reserve 阶段预留资源(信息性,不影响调度结果):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// pkg/scheduler/framework/processing.go
func (f *Framework) RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
    for _, pl := range f.reservePlugins {
        status := pl.Reserve(ctx, state, pod, nodeName)
        if !status.IsSuccess() {
            return status
        }
    }

    return NewStatus(Success)
}

10. Permit(许可阶段)

源码位置pkg/scheduler/framework/processing.go

Permit 阶段可以批准或拒绝绑定:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
    for _, pl := range f.permitPlugins {
        status, _ := pl.Permit(ctx, state, pod, nodeName)
        if status.Code() == Wait {
            // Pod 进入等待状态
            return status
        }
        if !status.IsSuccess() {
            return status
        }
    }

    return NewStatus(Success)
}

Binding Cycle(绑定周期)

Binding Cycle 将 Pod 实际绑定到节点,会修改 API Server 中的 Pod 对象。

1. WaitOnPermit(等待许可阶段)

源码位置pkg/scheduler/framework/processing.go

如果 Permit 阶段返回 Wait,Pod 会进入等待状态:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// pkg/scheduler/framework/processing.go
func (f *Framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status {
    // 等待 Permit 插件批准
    for {
        status := f.checkPermitStatus(pod)
        if status.Code() != Wait {
            return status
        }
        // 等待一段时间后重试
        time.Sleep(100 * time.Millisecond)
    }
}

2. PreBind(预绑定阶段)

源码位置pkg/scheduler/framework/processing.go

PreBind 阶段在绑定前执行准备工作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
    for _, pl := range f.preBindPlugins {
        status := pl.PreBind(ctx, state, pod, nodeName)
        if !status.IsSuccess() {
            return status
        }
    }

    return NewStatus(Success)
}

内置 PreBind 插件

  1. VolumeBinding:绑定 PVC 到 PV
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *CycleState, pod *v1.Pod, nodeName string) *Status {
    s, err := getStateData(cs)
    if err != nil {
        return AsStatus(err)
    }
    if s.allBound {
        // 所有卷都已绑定,无需操作
        return nil
    }

    podVolumes, ok := s.podVolumesByNode[nodeName]
    if !ok {
        return AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
    }

    // 绑定 Pod 的卷
    err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
    if err != nil {
        return AsStatus(err)
    }

    return nil
}

3. Bind(绑定阶段)

源码位置pkg/scheduler/framework/processing.go

Bind 阶段实际绑定 Pod 到节点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// pkg/scheduler/framework/processing.go
func (f *Framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
    for _, pl := range f.bindPlugins {
        status := pl.Bind(ctx, state, pod, nodeName)
        if status.Code() == Skip {
            // 插件跳过,尝试下一个插件
            continue
        }
        return status
    }

    // 所有插件都跳过,返回 Skip
    return NewStatus(Skip)
}

内置 Bind 插件

  1. DefaultBinder:默认绑定实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// pkg/scheduler/framework/plugins/defaultbinder/default_binder.go
func (b *DefaultBinder) Bind(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status {
    // 创建 Pod 的副本
    podCopy := pod.DeepCopy()

    // 更新 Pod 的 nodeName 字段
    podCopy.Spec.NodeName = nodeName

    // 调用 API Server 更新 Pod
    _, err := b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
    if err != nil {
        return NewStatus(Error, err.Error())
    }

    return NewStatus(Success)
}

4. PostBind(后绑定阶段)

源码位置pkg/scheduler/framework/processing.go

PostBind 阶段在绑定成功后执行清理工作:

1
2
3
4
5
6
// pkg/scheduler/framework/processing.go
func (f *Framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
    for _, pl := range f.postBindPlugins {
        pl.PostBind(ctx, state, pod, nodeName)
    }
}

关键点

  • PostBind 是异步的,不会阻塞调度
  • 失败不影响调度结果
  • 用于清理临时资源和记录日志

调度流程总结

  sequenceDiagram
    participant Q as Scheduling Queue
    participant S as Scheduler
    participant F as Framework
    participant P as Plugins
    participant A as API Server

    Q->>S: Pod 入队
    S->>F: 开始调度循环

    Note over F: Scheduling Cycle
    F->>P: PreEnqueue
    F->>P: PreFilter
    F->>P: Filter(并行)
    alt 所有节点被过滤
        F->>P: PostFilter
        F->>Q: Pod 重新入队
    end

    F->>P: PreScore
    F->>P: Score(并行)
    F->>P: NormalizeScore
    F->>P: Reserve
    F->>P: Permit

    Note over F: Binding Cycle
    F->>P: WaitOnPermit
    F->>P: PreBind
    F->>A: Bind(更新 Pod.nodeName)
    F->>P: PostBind

    S->>Q: 调度完成

关键设计原则

  1. 并行处理:Filter 和 Score 阶段并行处理所有节点
  2. 短路求值:任何阶段失败都会立即终止调度
  3. 状态管理:CycleState 在调度过程中传递状态
  4. 插件化设计:每个阶段都可以插入自定义插件
  5. 异步绑定:PostBind 异步执行,不阻塞调度

内置插件特殊处理详解

Kubernetes 内置了一些重要的插件,它们在特定阶段有特殊的处理方式。以下是所有内置插件的完整列表和特殊处理说明。

1. VolumeBinding 插件:跨多个阶段的复杂处理

VolumeBinding 是一个跨 PreFilter、Filter、Reserve、PreBind 四个阶段的复杂插件,用于处理 PVC 和 PV 的绑定。

  flowchart TD
    A[PreFilter] --> B[检查 PVC 是否已绑定]
    B --> C{有未绑定的 PVC?}
    C -->|是| D[返回 UnschedulableAndUnresolvable]
    C -->|否| E[Filter]

    E --> F[为每个节点查找匹配的 PV]
    F --> G[检查 PV 的节点亲和性]
    G --> H[Reserve]

    H --> I[AssumePodVolumes - 假设绑定]
    I --> J[PreBind]

    J --> K[BindPodVolumes - 实际绑定]
    K --> L[等待 PV 控制器完成绑定]

PreFilter 阶段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) PreFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
    // 检查 Pod 是否有 PVC
    if hasPVC, err := pl.podHasPVCs(pod); err != nil {
        return nil, fwk.NewStatus(fwk.UnschedulableAndUnresolvable, err.Error())
    } else if !hasPVC {
        // 没有 PVC,跳过
        state.Write(stateKey, &stateData{})
        return nil, fwk.NewStatus(fwk.Skip)
    }

    // 获取 Pod 的所有 PVC
    podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(logger, pod)
    if err != nil {
        return nil, fwk.AsStatus(err)
    }

    // 关键:检查是否有未绑定的 immediate PVC
    if len(podVolumeClaims.unboundClaimsImmediate) > 0 {
        // 返回 UnschedulableAndUnresolvable,Pod 不会重新入队
        status := fwk.NewStatus(fwk.UnschedulableAndUnresolvable)
        status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
        return nil, status
    }

    // 将 PVC 信息存储到 CycleState
    state.Write(stateKey, &stateData{
        podVolumesByNode: make(map[string]*PodVolumes),
        podVolumeClaims: &PodVolumeClaims{
            boundClaims:                podVolumeClaims.boundClaims,
            unboundClaimsDelayBinding:  podVolumeClaims.unboundClaimsDelayBinding,
            unboundVolumesDelayBinding: podVolumeClaims.unboundVolumesDelayBinding,
        },
    })
    return nil, nil
}

Filter 阶段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) Filter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    // 为当前节点查找匹配的 PV
    podVolumes, status := pl.findPodVolumes(ctx, state, pod, nodeInfo)
    if !status.IsSuccess() {
        return status
    }

    // 将查找结果缓存到 CycleState
    s, _ := getStateData(state)
    s.podVolumesByNode[nodeInfo.Node().Name] = podVolumes

    return nil
}

Reserve 阶段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
    state, err := getStateData(cs)
    if err != nil {
        return fwk.AsStatus(err)
    }

    podVolumes, ok := state.podVolumesByNode[nodeName]
    if ok {
        // AssumePodVolumes:假设绑定,但还未实际绑定
        allBound, err := pl.Binder.AssumePodVolumes(klog.FromContext(ctx), pod, nodeName, podVolumes)
        if err != nil {
            return fwk.AsStatus(err)
        }
        state.allBound = allBound
    }
    return nil
}

PreBind 阶段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) PreBind(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
    s, err := getStateData(cs)
    if err != nil {
        return fwk.AsStatus(err)
    }
    if s.allBound {
        // 所有卷都已绑定,无需操作
        return nil
    }

    podVolumes, ok := s.podVolumesByNode[nodeName]
    if !ok {
        return fwk.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
    }

    // BindPodVolumes:实际绑定 PVC 到 PV
    err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
    if err != nil {
        return fwk.AsStatus(err)
    }

    return nil
}

关键点

  • PreFilter 检查 PVC 状态,如果未绑定则返回 UnschedulableAndUnresolvable
  • Filter 为每个节点查找匹配的 PV,并缓存结果
  • Reserve 阶段使用 AssumePodVolumes 假设绑定(乐观锁)
  • PreBind 阶段才真正执行绑定操作

2. DefaultPreemption 插件:抢占机制

当所有节点都被过滤掉时,DefaultPreemption 插件会尝试抢占低优先级 Pod:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, m fwk.NodeToStatusReader) (*fwk.PostFilterResult, *fwk.Status) {
    defer func() {
        metrics.PreemptionAttempts.Inc()  // 记录抢占尝试次数
    }()

    result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
    msg := status.Message()
    if len(msg) > 0 {
        return result, fwk.NewStatus(status.Code(), "preemption: "+msg)
    }
    return result, status
}

抢占流程

  1. 检查是否有节点可以通过抢占低优先级 Pod 来调度当前 Pod
  2. 选择最优的抢占目标(最少抢占 Pod 数量)
  3. 删除抢占目标 Pod
  4. 当前 Pod 重新入队,等待下一次调度

3. BalancedAllocation 插件:资源平衡打分

BalancedAllocation 插件在 PreScore 阶段计算 Pod 的资源请求,在 Score 阶段为节点打分:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go
func (ba *BalancedAllocation) PreScore(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) *fwk.Status {
    // 计算 Pod 的资源请求
    podRequests := ba.calculatePodResourceRequestList(pod, ba.resources)

    // 特殊处理:跳过 best-effort Pod(没有资源请求的 Pod)
    if ba.isBestEffortPod(podRequests) {
        return fwk.NewStatus(fwk.Skip)
    }

    // 将计算结果存储到 CycleState
    state := &balancedAllocationPreScoreState{
        podRequests: podRequests,
    }
    cycleState.Write(balancedAllocationPreScoreStateKey, state)
    return nil
}

关键点

  • PreScore 阶段预先计算 Pod 的资源请求,避免在每个节点的 Score 阶段重复计算
  • Best-effort Pod(没有资源请求)会跳过打分,防止大量 Pod 都调度到同一个节点

4. NodeUnschedulable 插件:节点不可调度检查

NodeUnschedulable 插件检查节点是否标记为不可调度,以及 Pod 是否容忍该污点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go
func (pl *NodeUnschedulable) Filter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    node := nodeInfo.Node()

    if !node.Spec.Unschedulable {
        return nil  // 节点可调度
    }

    // 检查 Pod 是否容忍 unschedulable 污点
    podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(
        klog.FromContext(ctx),
        pod.Spec.Tolerations,
        &v1.Taint{
            Key:    v1.TaintNodeUnschedulable,
            Effect: v1.TaintEffectNoSchedule,
        },
        utilfeature.DefaultFeatureGate.Enabled(features.TaintTolerationComparisonOperators),
    )

    if !podToleratesUnschedulable {
        // 返回 UnschedulableAndUnresolvable,节点被永久过滤
        return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "node(s) were unschedulable")
    }

    return nil
}

关键点

  • 如果节点标记为 unschedulable=true,且 Pod 不容忍该污点,返回 UnschedulableAndUnresolvable
  • UnschedulableAndUnresolvable 表示节点被永久过滤,不会重新尝试

5. DefaultBinder 插件:Pod 绑定

DefaultBinder 是默认的绑定插件,负责将 Pod 绑定到节点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// pkg/scheduler/framework/plugins/defaultbinder/default_binder.go
func (b DefaultBinder) Bind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
    logger := klog.FromContext(ctx)

    // 创建 Binding 对象
    binding := &v1.Binding{
        ObjectMeta: metav1.ObjectMeta{
            Namespace: p.Namespace,
            Name:      p.Name,
            UID:       p.UID,
        },
        Target: v1.ObjectReference{
            Kind: "Node",
            Name: nodeName,
        },
    }

    if b.handle.APICacher() != nil {
        // 当 API Cacher 可用时,使用 Cacher 进行绑定(性能优化)
        onFinish, err := b.handle.APICacher().BindPod(binding)
        if err != nil {
            return fwk.AsStatus(err)
        }
        // 等待绑定完成
        err = b.handle.APICacher().WaitOnFinish(ctx, onFinish)
        if err != nil {
            return fwk.AsStatus(err)
        }
        return nil
    }

    // 直接调用 API Server 进行绑定
    logger.V(3).Info("Attempting to bind pod to node", "pod", klog.KObj(p), "node", klog.KRef("", nodeName))
    err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
    if err != nil {
        return fwk.AsStatus(err)
    }

    return nil
}

关键点

  • 使用 Binding 对象进行绑定,而不是直接更新 Pod
  • 当 API Cacher 可用时,使用 Cacher 进行绑定,提高性能
  • 绑定成功后,kubelet 会监听到 Pod 并创建容器

6. Filter 插件完整列表

TaintToleration 插件:污点容忍度检查

检查 Pod 是否容忍节点的污点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go
func (pl *TaintToleration) Filter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    node := nodeInfo.Node()

    // 查找节点上 Pod 不容忍的污点
    taint, isUntolerated := v1helper.FindMatchingUntoleratedTaint(
        klog.FromContext(ctx),
        node.Spec.Taints,
        pod.Spec.Tolerations,
        helper.DoNotScheduleTaintsFilterFunc(),
        pl.enableTaintTolerationComparisonOperators,
    )

    if isUntolerated {
        // 返回 UnschedulableAndUnresolvable
        return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("node(s) had taint {%s}, that the pod didn't tolerate", taint.ToString()))
    }

    return nil
}

NodeAffinity 插件:节点亲和性检查

检查 Pod 的节点亲和性要求:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
    affinity := pod.Spec.Affinity
    noNodeAffinity := (affinity == nil ||
        affinity.NodeAffinity == nil ||
        affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil)

    if noNodeAffinity && pl.addedNodeSelector == nil && pod.Spec.NodeSelector == nil {
        // 没有节点亲和性要求,跳过
        return nil, fwk.NewStatus(fwk.Skip)
    }

    // 提取必需的节点选择器
    state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
    cycleState.Write(stateKey, state)
    return nil, nil
}

func (pl *NodeAffinity) Filter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    // 检查节点是否满足 Pod 的节点亲和性要求
    // ...
}

NodeName 插件:指定节点检查

检查 Pod 是否指定了节点名称:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// pkg/scheduler/framework/plugins/nodename/node_name.go
func (pl *NodeName) Filter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    if !Fits(pod, nodeInfo) {
        return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, ErrReason)
    }
    return nil
}

// Fits 检查 Pod 是否适合节点
func Fits(pod *v1.Pod, nodeInfo fwk.NodeInfo) bool {
    // 如果 Pod 没有指定 NodeName,或者 NodeName 匹配当前节点,则适合
    return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name
}

NodePorts 插件:节点端口检查

检查节点端口是否冲突:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// pkg/scheduler/framework/plugins/nodeports/node_ports.go
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
    // 获取 Pod 的主机端口
    hostPorts := getHostPorts(pod)
    if len(hostPorts) == 0 {
        return nil, fwk.NewStatus(fwk.Skip)
    }

    // 将端口信息存储到 CycleState
    cycleState.Write(stateKey, &preFilterState{
        hostPorts: hostPorts,
    })
    return nil, nil
}

func (pl *NodePorts) Filter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    // 检查节点端口是否冲突
    // ...
}

CSILimits 插件:CSI 驱动限制检查

检查节点上的 CSI 驱动是否达到限制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// pkg/scheduler/framework/plugins/nodevolumelimits/csi.go
func (pl *CSILimits) PreFilter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, _ []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
    volumes := pod.Spec.Volumes
    for i := range volumes {
        vol := &volumes[i]
        if vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil || pl.translator.IsInlineMigratable(vol) {
            return nil, nil
        }
    }
    // 没有卷,跳过
    return nil, fwk.NewStatus(fwk.Skip)
}

func (pl *CSILimits) Filter(ctx context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    // 检查 CSI 驱动限制
    // ...
}

VolumeZone 插件:卷可用区检查

检查卷的可用区是否与节点匹配:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// pkg/scheduler/framework/plugins/volumezone/volume_zone.go
func (pl *VolumeZone) PreFilter(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
    // 获取 Pod 的 PV 拓扑信息
    podPVTopologies, status := pl.getPVbyPod(logger, pod)
    if !status.IsSuccess() {
        return nil, status
    }
    if len(podPVTopologies) == 0 {
        return nil, fwk.NewStatus(fwk.Skip)
    }

    cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies})
    return nil, nil
}

func (pl *VolumeZone) Filter(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    // 检查卷的可用区是否与节点匹配
    // ...
}

VolumeRestrictions 插件:卷限制检查

检查卷的使用限制(如 ReadWriteOnce):

1
2
3
4
5
// pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
    // 检查卷限制
    // ...
}

PodTopologySpread 插件:Pod 拓扑分布检查

检查 Pod 的拓扑分布约束:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) {
    // 计算 Pod 的拓扑分布约束
    s, err := pl.calPreFilterState(ctx, pod, nodes)
    if err != nil {
        return nil, fwk.AsStatus(err)
    }
    if s != nil && len(s.Constraints) == 0 {
        return nil, fwk.NewStatus(fwk.Skip)
    }

    cycleState.Write(stateKey, s)
    return nil, nil
}

func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
    // 检查拓扑分布约束
    // ...
}

SchedulingGates 插件:调度门检查

检查 Pod 是否有未解除的调度门:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go
func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *fwk.Status {
    if len(p.Spec.SchedulingGates) == 0 {
        return nil
    }

    gates := make([]string, 0, len(p.Spec.SchedulingGates))
    for _, gate := range p.Spec.SchedulingGates {
        gates = append(gates, gate.Name)
    }

    return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates))
}

7. Score 插件完整列表

ImageLocality 插件:镜像本地性打分

优先选择已有容器镜像的节点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// pkg/scheduler/framework/plugins/imagelocality/image_locality.go
func (pl *ImageLocality) Score(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) (int64, *fwk.Status) {
    nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List()
    if err != nil {
        return 0, fwk.AsStatus(err)
    }
    totalNumNodes := len(nodeInfos)

    // 计算镜像分数
    imageScores := sumImageScores(nodeInfo, pod, totalNumNodes)
    score := calculatePriority(imageScores, len(pod.Spec.InitContainers)+len(pod.Spec.Containers))

    return score, nil
}

InterPodAffinity 插件:Pod 亲和性打分

根据 Pod 亲和性要求为节点打分:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// pkg/scheduler/framework/plugins/interpodaffinity/scoring.go
func (pl *InterPodAffinity) PreScore(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) *fwk.Status {
    // 计算 Pod 亲和性相关的状态
    // ...
}

func (pl *InterPodAffinity) Score(ctx context.Context, cycleState fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) (int64, *fwk.Status) {
    // 根据 Pod 亲和性为节点打分
    // ...
}

8. 其他重要插件

PrioritySort 插件:队列排序

在 Sort 阶段对 Pod 进行排序:

1
2
3
4
5
// pkg/scheduler/framework/plugins/queuesort/priority_sort.go
func (pl *PrioritySort) Less(pod1, pod2 *v1.Pod) bool {
    // 高优先级的 Pod 排在前面
    return GetPodPriority(pod1) > GetPodPriority(pod2)
}

GangScheduling 插件:成组调度

实现 Pod 组的协同调度:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go
func (pl *GangScheduling) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
    // 假设 Pod 属于一个组
    podGroupInfo.AssumePod(pod.UID)
    return nil
}

func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
    // 等待组内所有 Pod 都准备好
    // ...
}

DynamicResources 插件:动态资源处理

处理动态资源(DRA)的调度:

1
2
3
4
5
// pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go
func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
    // 处理动态资源预留
    // ...
}

参考资料

#Kubernetes #Scheduler #源码分析