ᕕ( ᐛ )ᕗ Jimyag's Blog

Kubernetes 自定义调度器:基于 Scheduler Framework 实现

· 2272 字 · 约 11 分钟

Kubernetes 默认的调度器能满足大部分场景,但某些场景需要自定义调度逻辑。例如:

  • 将特定应用的 Pod 调度到特定的节点组
  • 实现更复杂的资源匹配规则
  • 添加自定义的过滤或打分逻辑

Kubernetes 1.19 引入了 Scheduler Framework,让开发者可以通过插件化的方式扩展调度器,无需修改 Kubernetes 源码。

本文介绍如何基于 Scheduler Framework 实现一个自定义调度器,通过标签过滤节点,实现 Pod 的精准调度。

Scheduler Framework 扩展点

Scheduler Framework 定义了一系列扩展点,开发者可以在这些扩展点注册自定义插件:

  graph TD
    A[PreEnqueue] --> B[Sort]
    B --> C[Scheduling Cycle]
    
    subgraph Scheduling_Cycle [Scheduling Cycle]
        C1[PreFilter]
        C2[Filter]
        C3[PostFilter]
        C4[PreScore]
        C5[Score]
        C6[NormalizeScore]
        C7[Reserve]
        C8[Permit]
    end
    
    C --> C1
    C1 --> C2
    C2 --> C3
    C3 --> C4
    C4 --> C5
    C5 --> C6
    C6 --> C7
    C7 --> C8
    
    C8 --> D[Binding Cycle]
    
    subgraph Binding_Cycle [Binding Cycle]
        D1[WaitOnPermit]
        D2[PreBind]
        D3[Bind]
        D4[PostBind]
    end
    
    D --> D1
    D1 --> D2
    D2 --> D3
    D3 --> D4
    
    style C1 fill:#e1f5ff
    style C2 fill:#e1f5ff
    style C3 fill:#e1f5ff
    style C4 fill:#e1f5ff
    style C5 fill:#e1f5ff
    style C6 fill:#e1f5ff
    style D2 fill:#fff4e1
    style D3 fill:#fff4e1
    style D4 fill:#f0f0f0

关键扩展点:

  • PreFilter:Pod 预处理和检查,不符合预期可以提前结束调度
  • Filter:过滤掉不满足要求的节点
  • PostFilter:如果 Filter 阶段所有节点都被过滤掉,执行此阶段
  • Score:为节点打分,分数越高越优先
  • PreBind/Bind/PostBind:绑定阶段的扩展点

扩展点详解

Scheduling Cycle(调度周期)

PreFilter

  • 作用:Pod 预处理和检查
  • 失败影响:调度提前终止,Pod 保持 Pending
  • 应用场景:检查 Pod 是否满足调度前提条件(如 PVC 是否存在、资源配额是否充足)

Filter

  • 作用:过滤掉不满足要求的节点
  • 失败影响:单个节点被过滤,不影响其他节点的调度
  • 应用场景:资源检查、标签匹配、污点容忍、端口冲突、卷挂载冲突等

PostFilter

  • 作用:当所有节点都被过滤掉时执行
  • 失败影响:不影响调度结果(已经是失败状态)
  • 应用场景:尝试抢占低优先级 Pod、记录详细错误信息、通知用户

PreScore

  • 作用:打分前的准备工作
  • 失败影响:调度失败,Pod 保持 Pending
  • 应用场景:准备打分所需的数据、计算权重

Score

  • 作用:为节点打分(0-100 分)
  • 失败影响:节点得分为 0,不影响其他节点
  • 应用场景:节点优先级计算、资源平衡、亲和性打分、拓扑分布等

NormalizeScore

  • 作用:标准化分数(修正到 0-100 范围)
  • 失败影响:调度失败,Pod 保持 Pending
  • 应用场景:修正分数范围、处理极端分数

Binding Cycle(绑定周期)

Reserve

  • 作用:预留资源(信息性,不影响调度)
  • 失败影响:不影响调度结果
  • 应用场景:缓存资源使用情况、预分配资源

具体例子:调度器在内存中维护节点资源缓存。当 Pod A 调度到 Node1 时,Reserve 阶段会在缓存中标记 Node1 的 CPU/内存已被占用。这样当 Pod B 尝试调度时,能看到最新的资源使用情况,避免资源超卖。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Reserve 示例:更新资源缓存
func (r *MyPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    // 获取节点的资源使用情况
    nodeInfo, err := r.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
    if err != nil {
        return framework.NewStatus(framework.Error, err.Error())
    }
    
    // 在缓存中预留资源(不影响实际调度,只是更新缓存)
    r.resourceCache[nodeName] = nodeInfo.RequestedResource()
    
    return framework.NewStatus(framework.Success, "")
}

Permit

  • 作用:批准或拒绝绑定
  • 失败影响:可以阻止绑定,Pod 重新进入调度队列
  • 应用场景:等待其他 Pod 同时调度(批调度)、实现协同调度

具体例子:假设有一个分布式数据库集群,需要同时启动主节点和从节点。Permit 阶段可以等待所有 Pod 都调度成功后,再一起绑定,确保主从节点能同时启动。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Permit 示例:等待一组 Pod 同时调度
func (p *MyPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
    // 获取 Pod 所属的组
    groupKey := pod.Labels["app-group"]
    
    // 检查组内所有 Pod 是否都已调度
    pods, err := p.handle.ClientSet().CoreV1().Pods(pod.Namespace).List(ctx, metav1.ListOptions{
        LabelSelector: fmt.Sprintf("app-group=%s", groupKey),
    })
    if err != nil {
        return framework.NewStatus(framework.Error, err.Error()), 0
    }
    
    // 统计已调度的 Pod 数量
    scheduledCount := 0
    for _, p := range pods.Items {
        if p.Spec.NodeName != "" {
            scheduledCount++
        }
    }
    
    // 如果所有 Pod 都已调度,允许绑定
    if scheduledCount == len(pods.Items) {
        return framework.NewStatus(framework.Success, ""), 0
    }
    
    // 否则等待 30 秒
    return framework.NewStatus(framework.Wait, "waiting for other pods"), 30 * time.Second
}

PreBind

  • 作用:绑定前的准备工作
  • 失败影响:绑定失败,Pod 重新进入调度队列
  • 应用场景:设置 PVC、创建卷、配置网络、确保存储已就绪

具体例子:Pod 使用了 PVC,PreBind 阶段确保 PVC 已经绑定到 PV,并且 PV 已经挂载到目标节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// PreBind 示例:确保 PVC 已绑定并可用
func (p *MyPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    // 遍历 Pod 的所有 PVC
    for _, volume := range pod.Spec.Volumes {
        if volume.PersistentVolumeClaim == nil {
            continue
        }
        
        pvcName := volume.PersistentVolumeClaim.ClaimName
        
        // 检查 PVC 是否存在
        pvc, err := p.handle.ClientSet().CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, pvcName, metav1.GetOptions{})
        if err != nil {
            return framework.NewStatus(framework.Error, fmt.Sprintf("PVC %s not found", pvcName))
        }
        
        // 确保 PVC 已经绑定到 PV
        if pvc.Spec.VolumeName == "" {
            return framework.NewStatus(framework.Error, fmt.Sprintf("PVC %s not bound", pvcName))
        }
        
        // 确保 PV 已经挂载到目标节点(如果是 ReadWriteOnce)
        pv, err := p.handle.ClientSet().CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
        if err != nil {
            return framework.NewStatus(framework.Error, fmt.Sprintf("PV %s not found", pvc.Spec.VolumeName))
        }
        
        if pv.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {
            // 如果 PV 是 Retain 策略,确保之前的 Pod 已经删除
            klog.Infof("PV %s is Retain policy, checking if it's safe to bind", pv.Name)
        }
    }
    
    return framework.NewStatus(framework.Success, "")
}

PVC 绑定流程

根据 Kubernetes 源码分析(pkg/scheduler/framework/plugins/volumebinding/volume_binding.gopkg/controller/volume/persistentvolume/pv_controller.go),Pod 和 PVC 的绑定流程如下:

  sequenceDiagram
    participant U as 用户
    participant P as PersistentVolumeController
    participant S as 调度器
    participant K as kubelet

    U->>S: 创建 Pod(使用 PVC)
    S->>S: PreFilter:检查 PVC 是否已绑定
    alt PVC 未绑定
        S-->>S: 返回 UnschedulableAndUnresolvable
        Note over S: Pod 进入不可调度队列
        U->>P: 创建 PVC
        P->>P: findBestMatchForClaim() 查找匹配 PV
        P->>P: bindClaimToVolume() 绑定 PVC 到 PV
        Note over P: PVC 状态变为 Bound
        S->>S: 重新调度 Pod
    end
    S->>S: Filter:为 PVC 选择合适的节点
    S->>S: PreBind:BindPodVolumes()
    S->>S: Bind:Pod 绑定到节点
    K->>K: Watch 到 Pod 绑定到本节点
    K->>K: 挂载 PV 到容器
    K->>K: 启动容器

关键点(基于源码)

  1. Pod 先创建:用户创建 Pod 时,Pod 进入调度器
  2. PreFilter 检查 PVCVolumeBinding 插件的 PreFilter 方法检查 Pod 的 PVC 是否已绑定(代码:volume_binding.go 第 14-20 行)
    • 如果 PVC 未绑定,返回 UnschedulableAndUnresolvable 错误
    • Pod 被移到不可调度队列,等待 PVC 绑定
  3. PersistentVolumeController 绑定 PVC:独立于调度器运行(代码:pv_controller.go
    • syncUnboundClaim() 函数查找匹配的 PV
    • bindClaimToVolume() 函数绑定 PVC 到 PV
  4. 调度器重新调度 Pod:PVC 绑定后,调度器重新尝试调度 Pod
  5. PreBind 阶段绑定卷BindPodVolumes() 函数在 PreBind 阶段执行实际的卷绑定操作(代码:binder.go

注意:如果 PVC 不存在或未绑定到 PV,Pod 会一直处于 Pending 状态,直到 PVC 被绑定。

Bind

  • 作用:实际的绑定操作(更新 Pod 的 nodeName)
  • 失败影响:绑定失败,Pod 重新进入调度队列
  • 应用场景:自定义绑定逻辑、多集群调度

具体例子:实现跨集群调度,将 Pod 绑定到其他集群的节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Bind 示例:多集群调度
func (b *MyPlugin) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    // 解析节点名称,格式为 "cluster-name:node-name"
    parts := strings.Split(nodeName, ":")
    if len(parts) != 2 {
        return framework.NewStatus(framework.Error, "invalid node name format")
    }
    
    clusterName := parts[0]
    targetNodeName := parts[1]
    
    // 如果目标集群不是当前集群,需要特殊处理
    if clusterName != b.currentCluster {
        // 获取目标集群的客户端
        targetClient, err := b.getClusterClient(clusterName)
        if err != nil {
            return framework.NewStatus(framework.Error, fmt.Sprintf("failed to get client for cluster %s: %v", clusterName, err))
        }
        
        // 在目标集群创建 Pod
        podCopy := pod.DeepCopy()
        podCopy.Spec.NodeName = targetNodeName
        podCopy.ResourceVersion = "" // 清除 resource version
        
        _, err = targetClient.CoreV1().Pods(pod.Namespace).Create(ctx, podCopy, metav1.CreateOptions{})
        if err != nil {
            return framework.NewStatus(framework.Error, err.Error())
        }
        
        // 在当前集群删除 Pod(或者标记为已完成)
        err = b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
        if err != nil {
            klog.Warningf("failed to delete pod from current cluster: %v", err)
        }
        
        return framework.NewStatus(framework.Success, "")
    }
    
    // 当前集群,使用默认绑定逻辑
    podCopy := pod.DeepCopy()
    podCopy.Spec.NodeName = targetNodeName
    _, err := b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
    if err != nil {
        return framework.NewStatus(framework.Error, err.Error())
    }
    
    return framework.NewStatus(framework.Success, "")
}

PostBind

  • 作用:绑定后的清理工作(信息性)
  • 失败影响:不影响调度结果
  • 应用场景:清理临时资源、记录日志、通知外部系统

具体例子:绑定成功后,清理调度过程中创建的临时 ConfigMap 或 Secret,并向监控系统发送通知。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// PostBind 示例:清理临时资源和通知
func (p *MyPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
    // 清理临时 ConfigMap(假设在 PreFilter 阶段创建)
    tempConfigMapName := fmt.Sprintf("temp-%s-%s", pod.Namespace, pod.Name)
    err := p.handle.ClientSet().CoreV1().ConfigMaps(pod.Namespace).Delete(ctx, tempConfigMapName, metav1.DeleteOptions{})
    if err != nil {
        klog.Warningf("failed to delete temp configmap %s: %v", tempConfigMapName, err)
    } else {
        klog.Infof("cleaned up temp configmap %s", tempConfigMapName)
    }
    
    // 记录调度日志
    logEntry := fmt.Sprintf("Pod %s/%s scheduled to node %s at %s", 
        pod.Namespace, pod.Name, nodeName, time.Now().Format(time.RFC3339))
    if err := p.logToFile(logEntry); err != nil {
        klog.Warningf("failed to write log: %v", err)
    }
    
    // 通知外部系统(如监控系统)
    if p.webhookURL != "" {
        payload := map[string]interface{}{
            "event":     "pod_scheduled",
            "pod":       fmt.Sprintf("%s/%s", pod.Namespace, pod.Name),
            "node":      nodeName,
            "timestamp": time.Now().Unix(),
        }
        
        jsonPayload, _ := json.Marshal(payload)
        resp, err := http.Post(p.webhookURL, "application/json", bytes.NewBuffer(jsonPayload))
        if err != nil {
            klog.Warningf("failed to notify webhook: %v", err)
        } else {
            resp.Body.Close()
        }
    }
}

为什么有 PreBind/Bind?kubelet 什么时候介入?

这是一个常见的误解。Bind 不是 kubelet 的操作,而是调度器的操作。

调度器和 kubelet 的职责划分

调度器

  • 职责:选择合适的节点
  • 操作:
    1. Filter 过滤节点
    2. Score 打分
    3. Bind 绑定(更新 Pod 的 nodeName 字段)

kubelet

  • 职责:在节点上运行 Pod
  • 操作:
    1. 监听 API Server,发现有 Pod 绑定到自己的节点
    2. 创建容器
    3. 启动容器
    4. 汇报状态

绑定流程详解

  sequenceDiagram
    participant S as 调度器
    participant A as API Server
    participant K as kubelet

    Note over S,A: Scheduling Cycle
    S->>A: Filter(过滤节点)
    S->>A: Score(为节点打分)
    
    Note over S,A: Binding Cycle
    S->>A: PreBind(准备 PVC、卷等资源)
    S->>A: Bind(更新 Pod.nodeName = "node1")
    
    A->>K: Watch(发现 Pod 绑定到 node1)
    K->>K: 创建容器
    K->>K: 启动容器
    K->>A: 汇报状态
    
    S->>A: PostBind(清理临时资源)

PreBind 的典型应用

PreBind 阶段主要用于准备绑定所需的资源

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 示例:PreBind 确保 PVC 已绑定
func (p *MyPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    // 检查 PVC 是否存在
    pvc, err := p.handle.ClientSet().CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, pvcName, metav1.GetOptions{})
    if err != nil {
        return framework.NewStatus(framework.Error, "PVC not found")
    }
    
    // 确保卷已经绑定到节点
    if pvc.Spec.VolumeName == "" {
        return framework.NewStatus(framework.Error, "PVC not bound")
    }
    
    return framework.NewStatus(framework.Success, "")
}

典型场景:

  • 确保 PVC 已经绑定到 PV
  • 确保存储卷已经准备好
  • 设置网络配置
  • 预分配 IP 地址

Bind 的默认实现

Bind 阶段用于实际绑定(更新 API Server 中的 Pod 对象):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 默认的 Bind 实现(简化版)
func (b *DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    // 创建 Pod 的副本
    podCopy := pod.DeepCopy()
    
    // 更新 Pod 的 nodeName 字段
    podCopy.Spec.NodeName = nodeName
    
    // 调用 API Server 更新 Pod
    _, err := b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
    if err != nil {
        return framework.NewStatus(framework.Error, err.Error())
    }
    
    return framework.NewStatus(framework.Success, "")
}

为什么需要自定义 Bind?

虽然大多数情况下使用默认的 Bind 即可,但在某些场景下需要自定义:

  1. 多集群调度:将 Pod 绑定到其他集群的节点
  2. 自定义调度结果存储:不使用 nodeName 字段,而是使用 annotation
  3. 延迟绑定:等待外部系统确认后再绑定

实现原理

本示例实现一个简单的调度器:

  • Pod 包含标签 scheduler.k8sdev.jimyag.com/filter: <value>
  • Node 包含相同的标签
  • Pod 只会被调度到标签值匹配的 Node 上

项目结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
scheduler/
├── main.go                 # 入口文件,注册插件
├── internal/
│   └── custom.go           # 插件实现
├── deploy/                 # 部署配置
│   ├── configmap.yaml
│   ├── deployment.yaml
│   ├── serviceaccount.yaml
│   └── rolebinding.yaml
└── test/                   # 测试用例
    ├── jimyag-scheduler.yaml
    ├── normal.yaml
    └── node-label.sh

插件注册

main.go 中注册自定义插件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"os"

	"k8s.io/component-base/cli"
	_ "k8s.io/component-base/metrics/prometheus/clientgo"
	_ "k8s.io/component-base/metrics/prometheus/version"
	"k8s.io/kubernetes/cmd/kube-scheduler/app"
	_ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"

	"github.com/jimyag/k8sdev/scheduler/internal"
)

func main() {
	// Register custom plugins to the scheduler framework.
	command := app.NewSchedulerCommand(
		app.WithPlugin(internal.Name, internal.New),
	)

	code := cli.Run(command)
	os.Exit(code)
}

关键点:

  • 使用 app.NewSchedulerCommand 创建调度器命令
  • 通过 app.WithPlugin 注册插件
  • 插件名称和初始化函数在 internal 包中定义

插件实现

internal/custom.go 实现了 Filter 和 PostFilter 扩展点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package internal

import (
	"context"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/klog/v2"
	"k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
	// 插件名称
	Name = "JimyagCustom"
	// 插件状态存储的 key
	FilterLabel = "scheduler.k8sdev.jimyag.com/filter"
)

var (
	_ framework.Plugin           = &JimyagCustom{}
	_ framework.FilterPlugin     = &JimyagCustom{}
	_ framework.PostFilterPlugin = &JimyagCustom{}
)

// New 初始化一个插件并返回
func New(rawArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
	klog.Infof("New Scheduling plugin: %s", Name)
	c := JimyagCustom{
		handle: h,
	}
	klog.Infof("%s plugin initialized", Name)
	return &c, nil
}

// JimyagCustom 是一个实现了扩展点的示例插件
type JimyagCustom struct {
	handle framework.Handle
}

// Name 返回插件名称
func (c *JimyagCustom) Name() string {
	return Name
}

插件需要实现 framework.Plugin 接口,只需要一个 Name() 方法。

Filter 实现

Filter 扩展点用于过滤节点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Filter 过滤掉那些不满足要求的 node
// 如果 node 没有自定义的 label,则忽略
// 如果 node 有,必须和 pod 的 label 匹配
func (c *JimyagCustom) Filter(ctx context.Context, state framework.CycleState, pod *v1.Pod, nodeInfo framework.NodeInfo) *framework.Status {
	node := nodeInfo.Node()
	klog.Infof("Filter %s/%s/%s: start scheduler plugin", pod.Namespace, pod.Name, node.Name)
	
	// 检查 Pod 是否有标签
	podValue, found := pod.GetLabels()[FilterLabel]
	if !found {
		// 如果 Pod 不包含自定义的 filter 标签,则可以任意分配
		klog.Infof("Filter %s/%s/%s: pod doesn't have '%s' label; can be scheduled",
			pod.Namespace, pod.Name, node.Name, FilterLabel)
		return framework.NewStatus(framework.Success, "")
	}
	
	// 检查 Node 是否有标签
	nodeValue, found := node.GetLabels()[FilterLabel]
	if !found {
		// 如果 Node 没有自定义的标签,则不能被分配
		klog.Infof("Filter %s/%s/%s: node doesn't have '%s' label; pod cannot be scheduled here",
			pod.Namespace, pod.Name, node.Name, FilterLabel)
		return framework.NewStatus(framework.Unschedulable, "node does not have required label")
	}
	
	// 检查标签值是否匹配
	if nodeValue != podValue {
		klog.Infof("Filter %s/%s/%s: node '%s' label value %s does not match pod label value (%s)",
			pod.Namespace, pod.Name, node.Name, FilterLabel, nodeValue, podValue)
		return framework.NewStatus(framework.Unschedulable, "nodeGroup of pod and node don't match")
	}

	klog.Infof("Filter %s/%s/%s: finished, node '%s' : values match %s",
		pod.Namespace, pod.Name, node.Name, FilterLabel, nodeValue)
	return framework.NewStatus(framework.Success, "")
}

Filter 逻辑:

  1. 如果 Pod 没有指定标签,则可以被调度到任意节点
  2. 如果 Pod 有标签,但 Node 没有标签,则不能调度
  3. 如果都有标签,则值必须匹配

返回值:

  • framework.Success:节点通过过滤
  • framework.Unschedulable:节点不满足条件

PostFilter 实现

PostFilter 在所有节点都被过滤掉时执行:

1
2
3
4
5
6
// PostFilter 如果 Filter 阶段之后,所有 nodes 都被筛掉了,才会执行这个阶段
func (c *JimyagCustom) PostFilter(ctx context.Context, state framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
	klog.Infof("PostFilter %s/%s: start", pod.Namespace, pod.Name)
	klog.Infof("PostFilter %s/%s: finish", pod.Namespace, pod.Name)
	return nil, framework.NewStatus(framework.Success, "")
}

PostFilter 可以用于:

  • 记录日志
  • 尝试抢占低优先级的 Pod
  • 返回更详细的错误信息

部署调度器

配置调度器

configmap.yaml 定义调度器配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
apiVersion: v1
kind: ConfigMap
metadata:
  name: jimyag-scheduler-config
  namespace: kube-system
data:
  jimyag-scheduler-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    profiles:
      - schedulerName: jimyag-scheduler
        plugins:
          filter:
            enabled:
            - name: JimyagCustom
          postFilter:
            enabled:
            - name: JimyagCustom
    leaderElection:
      leaderElect: false

关键配置:

  • schedulerName: jimyag-scheduler:调度器名称,Pod 需要通过 schedulerName 指定
  • plugins.filter.enabled:启用 Filter 扩展点的插件
  • leaderElection.leaderElect: false:单实例部署,禁用选举

部署资源

需要的资源:

1
2
3
4
5
6
7
scheduler/deploy/
├── configmap.yaml                            # 调度器配置
├── deployment.yaml                           # 调度器 Deployment
├── serviceaccount.yaml                       # ServiceAccount
├── rolebinding.yaml                          # RoleBinding
├── kube-scheduler-clusterrolebinding.yaml    # ClusterRoleBinding
└── volume-scheduler-clusterrolebinding.yaml  # Volume ClusterRoleBinding

部署命令:

1
2
3
# 修改 deployment.yaml 中的镜像地址
# 然后在 deploy 目录执行
kubectl apply -f ./

使用调度器

为节点打标签

首先为节点打标签:

1
2
# scheduler/test/node-label.sh
kubectl label nodes k8s-worker2 scheduler.k8sdev.jimyag.com/filter=normal

创建使用自定义调度器的 Pod

创建 Deployment,指定 schedulerName 和标签:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
apiVersion: apps/v1
kind: Deployment
metadata:
  name: jimyag-scheduler-normal
spec:
  replicas: 2
  selector:
    matchLabels:
      scheduler.k8sdev.jimyag.com/filter: normal
  template:
    metadata:
      labels:
        scheduler.k8sdev.jimyag.com/filter: normal
    spec:
      schedulerName: jimyag-scheduler  # 指定调度器
      containers:
        - image: nginx:1.7.9
          imagePullPolicy: IfNotPresent
          name: nginx
          resources:
            requests:
              cpu: "50m"
              memory: "64Mi"
            limits:
              cpu: "100m"
              memory: "128Mi"

关键点:

  • schedulerName: jimyag-scheduler:指定使用自定义调度器
  • Pod 标签 scheduler.k8sdev.jimyag.com/filter: normal:会被调度到有相同标签的节点

测试场景

  1. Pod 有标签,Node 有匹配标签

    • Pod 被调度到 k8s-worker2
  2. Pod 有标签,Node 没有匹配标签

    • Pod 一直处于 Pending 状态
  3. Pod 没有标签

    • Pod 被随机调度(使用默认调度器行为)

日志查看

查看调度器日志:

1
kubectl logs -n kube-system -l component=jimyag-scheduler

日志示例:

1
2
Filter default/jimyag-scheduler-normal-xxx/k8s-worker2: start scheduler plugin
Filter default/jimyag-scheduler-normal-xxx/k8s-worker2: finished, node 'scheduler.k8sdev.jimyag.com/filter' : values match normal

注意事项

  1. 调度器名称

    • schedulerName 必须唯一
    • Pod 必须通过 schedulerName 指定调度器
  2. 权限配置

    • 调度器需要 ClusterRole 访问 Node、Pod 等资源
    • 需要配置 ServiceAccount 和 RoleBinding
  3. 高可用

    • 生产环境建议启用 leaderElection
    • 部署多个副本,实现高可用
  4. 性能影响

    • Filter 插件会被频繁调用,注意性能
    • 避免在 Filter 中进行耗时的操作
  5. 调试

    • 使用 klog 记录日志
    • 通过日志理解调度过程

与原生调度器的关系

自定义调度器与默认调度器可以共存:

  • 默认调度器:schedulerName: default-scheduler(或不指定)
  • 自定义调度器:schedulerName: jimyag-scheduler

Pod 通过 schedulerName 选择使用哪个调度器。

总结

基于 Scheduler Framework 实现自定义调度器:

  • 优点

    • 无需修改 Kubernetes 源码
    • 插件化设计,灵活可扩展
    • 与默认调度器共存
  • 适用场景

    • 特定应用需要特定调度策略
    • 实现更复杂的资源匹配规则
    • 添加自定义的过滤或打分逻辑

本文示例实现了一个简单的标签过滤调度器,完整代码参考 GitHub: jimyag/k8sdev/scheduler

参考资料

#Kubernetes #Scheduler #Golang