Kubernetes 默认的调度器能满足大部分场景,但某些场景需要自定义调度逻辑。例如:
- 将特定应用的 Pod 调度到特定的节点组
- 实现更复杂的资源匹配规则
- 添加自定义的过滤或打分逻辑
Kubernetes 1.19 引入了 Scheduler Framework,让开发者可以通过插件化的方式扩展调度器,无需修改 Kubernetes 源码。
本文介绍如何基于 Scheduler Framework 实现一个自定义调度器,通过标签过滤节点,实现 Pod 的精准调度。
Scheduler Framework 扩展点
Scheduler Framework 定义了一系列扩展点,开发者可以在这些扩展点注册自定义插件:
graph TD
A[PreEnqueue] --> B[Sort]
B --> C[Scheduling Cycle]
subgraph Scheduling_Cycle [Scheduling Cycle]
C1[PreFilter]
C2[Filter]
C3[PostFilter]
C4[PreScore]
C5[Score]
C6[NormalizeScore]
C7[Reserve]
C8[Permit]
end
C --> C1
C1 --> C2
C2 --> C3
C3 --> C4
C4 --> C5
C5 --> C6
C6 --> C7
C7 --> C8
C8 --> D[Binding Cycle]
subgraph Binding_Cycle [Binding Cycle]
D1[WaitOnPermit]
D2[PreBind]
D3[Bind]
D4[PostBind]
end
D --> D1
D1 --> D2
D2 --> D3
D3 --> D4
style C1 fill:#e1f5ff
style C2 fill:#e1f5ff
style C3 fill:#e1f5ff
style C4 fill:#e1f5ff
style C5 fill:#e1f5ff
style C6 fill:#e1f5ff
style D2 fill:#fff4e1
style D3 fill:#fff4e1
style D4 fill:#f0f0f0
关键扩展点:
- PreFilter:Pod 预处理和检查,不符合预期可以提前结束调度
- Filter:过滤掉不满足要求的节点
- PostFilter:如果 Filter 阶段所有节点都被过滤掉,执行此阶段
- Score:为节点打分,分数越高越优先
- PreBind/Bind/PostBind:绑定阶段的扩展点
扩展点详解
Scheduling Cycle(调度周期)
PreFilter:
- 作用:Pod 预处理和检查
- 失败影响:调度提前终止,Pod 保持 Pending
- 应用场景:检查 Pod 是否满足调度前提条件(如 PVC 是否存在、资源配额是否充足)
Filter:
- 作用:过滤掉不满足要求的节点
- 失败影响:单个节点被过滤,不影响其他节点的调度
- 应用场景:资源检查、标签匹配、污点容忍、端口冲突、卷挂载冲突等
PostFilter:
- 作用:当所有节点都被过滤掉时执行
- 失败影响:不影响调度结果(已经是失败状态)
- 应用场景:尝试抢占低优先级 Pod、记录详细错误信息、通知用户
PreScore:
- 作用:打分前的准备工作
- 失败影响:调度失败,Pod 保持 Pending
- 应用场景:准备打分所需的数据、计算权重
Score:
- 作用:为节点打分(0-100 分)
- 失败影响:节点得分为 0,不影响其他节点
- 应用场景:节点优先级计算、资源平衡、亲和性打分、拓扑分布等
NormalizeScore:
- 作用:标准化分数(修正到 0-100 范围)
- 失败影响:调度失败,Pod 保持 Pending
- 应用场景:修正分数范围、处理极端分数
Binding Cycle(绑定周期)
Reserve:
- 作用:预留资源(信息性,不影响调度)
- 失败影响:不影响调度结果
- 应用场景:缓存资源使用情况、预分配资源
具体例子:调度器在内存中维护节点资源缓存。当 Pod A 调度到 Node1 时,Reserve 阶段会在缓存中标记 Node1 的 CPU/内存已被占用。这样当 Pod B 尝试调度时,能看到最新的资源使用情况,避免资源超卖。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Reserve 示例:更新资源缓存
func (r *MyPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// 获取节点的资源使用情况
nodeInfo, err := r.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
// 在缓存中预留资源(不影响实际调度,只是更新缓存)
r.resourceCache[nodeName] = nodeInfo.RequestedResource()
return framework.NewStatus(framework.Success, "")
}
|
Permit:
- 作用:批准或拒绝绑定
- 失败影响:可以阻止绑定,Pod 重新进入调度队列
- 应用场景:等待其他 Pod 同时调度(批调度)、实现协同调度
具体例子:假设有一个分布式数据库集群,需要同时启动主节点和从节点。Permit 阶段可以等待所有 Pod 都调度成功后,再一起绑定,确保主从节点能同时启动。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// Permit 示例:等待一组 Pod 同时调度
func (p *MyPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
// 获取 Pod 所属的组
groupKey := pod.Labels["app-group"]
// 检查组内所有 Pod 是否都已调度
pods, err := p.handle.ClientSet().CoreV1().Pods(pod.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app-group=%s", groupKey),
})
if err != nil {
return framework.NewStatus(framework.Error, err.Error()), 0
}
// 统计已调度的 Pod 数量
scheduledCount := 0
for _, p := range pods.Items {
if p.Spec.NodeName != "" {
scheduledCount++
}
}
// 如果所有 Pod 都已调度,允许绑定
if scheduledCount == len(pods.Items) {
return framework.NewStatus(framework.Success, ""), 0
}
// 否则等待 30 秒
return framework.NewStatus(framework.Wait, "waiting for other pods"), 30 * time.Second
}
|
PreBind:
- 作用:绑定前的准备工作
- 失败影响:绑定失败,Pod 重新进入调度队列
- 应用场景:设置 PVC、创建卷、配置网络、确保存储已就绪
具体例子:Pod 使用了 PVC,PreBind 阶段确保 PVC 已经绑定到 PV,并且 PV 已经挂载到目标节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// PreBind 示例:确保 PVC 已绑定并可用
func (p *MyPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// 遍历 Pod 的所有 PVC
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
// 检查 PVC 是否存在
pvc, err := p.handle.ClientSet().CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("PVC %s not found", pvcName))
}
// 确保 PVC 已经绑定到 PV
if pvc.Spec.VolumeName == "" {
return framework.NewStatus(framework.Error, fmt.Sprintf("PVC %s not bound", pvcName))
}
// 确保 PV 已经挂载到目标节点(如果是 ReadWriteOnce)
pv, err := p.handle.ClientSet().CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("PV %s not found", pvc.Spec.VolumeName))
}
if pv.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {
// 如果 PV 是 Retain 策略,确保之前的 Pod 已经删除
klog.Infof("PV %s is Retain policy, checking if it's safe to bind", pv.Name)
}
}
return framework.NewStatus(framework.Success, "")
}
|
PVC 绑定流程
根据 Kubernetes 源码分析(pkg/scheduler/framework/plugins/volumebinding/volume_binding.go 和 pkg/controller/volume/persistentvolume/pv_controller.go),Pod 和 PVC 的绑定流程如下:
sequenceDiagram
participant U as 用户
participant P as PersistentVolumeController
participant S as 调度器
participant K as kubelet
U->>S: 创建 Pod(使用 PVC)
S->>S: PreFilter:检查 PVC 是否已绑定
alt PVC 未绑定
S-->>S: 返回 UnschedulableAndUnresolvable
Note over S: Pod 进入不可调度队列
U->>P: 创建 PVC
P->>P: findBestMatchForClaim() 查找匹配 PV
P->>P: bindClaimToVolume() 绑定 PVC 到 PV
Note over P: PVC 状态变为 Bound
S->>S: 重新调度 Pod
end
S->>S: Filter:为 PVC 选择合适的节点
S->>S: PreBind:BindPodVolumes()
S->>S: Bind:Pod 绑定到节点
K->>K: Watch 到 Pod 绑定到本节点
K->>K: 挂载 PV 到容器
K->>K: 启动容器
关键点(基于源码):
- Pod 先创建:用户创建 Pod 时,Pod 进入调度器
- PreFilter 检查 PVC:
VolumeBinding 插件的 PreFilter 方法检查 Pod 的 PVC 是否已绑定(代码:volume_binding.go 第 14-20 行)
- 如果 PVC 未绑定,返回
UnschedulableAndUnresolvable 错误
- Pod 被移到不可调度队列,等待 PVC 绑定
- PersistentVolumeController 绑定 PVC:独立于调度器运行(代码:
pv_controller.go)
syncUnboundClaim() 函数查找匹配的 PV
bindClaimToVolume() 函数绑定 PVC 到 PV
- 调度器重新调度 Pod:PVC 绑定后,调度器重新尝试调度 Pod
- PreBind 阶段绑定卷:
BindPodVolumes() 函数在 PreBind 阶段执行实际的卷绑定操作(代码:binder.go)
注意:如果 PVC 不存在或未绑定到 PV,Pod 会一直处于 Pending 状态,直到 PVC 被绑定。
Bind:
- 作用:实际的绑定操作(更新 Pod 的 nodeName)
- 失败影响:绑定失败,Pod 重新进入调度队列
- 应用场景:自定义绑定逻辑、多集群调度
具体例子:实现跨集群调度,将 Pod 绑定到其他集群的节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
// Bind 示例:多集群调度
func (b *MyPlugin) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// 解析节点名称,格式为 "cluster-name:node-name"
parts := strings.Split(nodeName, ":")
if len(parts) != 2 {
return framework.NewStatus(framework.Error, "invalid node name format")
}
clusterName := parts[0]
targetNodeName := parts[1]
// 如果目标集群不是当前集群,需要特殊处理
if clusterName != b.currentCluster {
// 获取目标集群的客户端
targetClient, err := b.getClusterClient(clusterName)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to get client for cluster %s: %v", clusterName, err))
}
// 在目标集群创建 Pod
podCopy := pod.DeepCopy()
podCopy.Spec.NodeName = targetNodeName
podCopy.ResourceVersion = "" // 清除 resource version
_, err = targetClient.CoreV1().Pods(pod.Namespace).Create(ctx, podCopy, metav1.CreateOptions{})
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
// 在当前集群删除 Pod(或者标记为已完成)
err = b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
klog.Warningf("failed to delete pod from current cluster: %v", err)
}
return framework.NewStatus(framework.Success, "")
}
// 当前集群,使用默认绑定逻辑
podCopy := pod.DeepCopy()
podCopy.Spec.NodeName = targetNodeName
_, err := b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
return framework.NewStatus(framework.Success, "")
}
|
PostBind:
- 作用:绑定后的清理工作(信息性)
- 失败影响:不影响调度结果
- 应用场景:清理临时资源、记录日志、通知外部系统
具体例子:绑定成功后,清理调度过程中创建的临时 ConfigMap 或 Secret,并向监控系统发送通知。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// PostBind 示例:清理临时资源和通知
func (p *MyPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
// 清理临时 ConfigMap(假设在 PreFilter 阶段创建)
tempConfigMapName := fmt.Sprintf("temp-%s-%s", pod.Namespace, pod.Name)
err := p.handle.ClientSet().CoreV1().ConfigMaps(pod.Namespace).Delete(ctx, tempConfigMapName, metav1.DeleteOptions{})
if err != nil {
klog.Warningf("failed to delete temp configmap %s: %v", tempConfigMapName, err)
} else {
klog.Infof("cleaned up temp configmap %s", tempConfigMapName)
}
// 记录调度日志
logEntry := fmt.Sprintf("Pod %s/%s scheduled to node %s at %s",
pod.Namespace, pod.Name, nodeName, time.Now().Format(time.RFC3339))
if err := p.logToFile(logEntry); err != nil {
klog.Warningf("failed to write log: %v", err)
}
// 通知外部系统(如监控系统)
if p.webhookURL != "" {
payload := map[string]interface{}{
"event": "pod_scheduled",
"pod": fmt.Sprintf("%s/%s", pod.Namespace, pod.Name),
"node": nodeName,
"timestamp": time.Now().Unix(),
}
jsonPayload, _ := json.Marshal(payload)
resp, err := http.Post(p.webhookURL, "application/json", bytes.NewBuffer(jsonPayload))
if err != nil {
klog.Warningf("failed to notify webhook: %v", err)
} else {
resp.Body.Close()
}
}
}
|
为什么有 PreBind/Bind?kubelet 什么时候介入?
这是一个常见的误解。Bind 不是 kubelet 的操作,而是调度器的操作。
调度器和 kubelet 的职责划分
调度器:
- 职责:选择合适的节点
- 操作:
- Filter 过滤节点
- Score 打分
- Bind 绑定(更新 Pod 的
nodeName 字段)
kubelet:
- 职责:在节点上运行 Pod
- 操作:
- 监听 API Server,发现有 Pod 绑定到自己的节点
- 创建容器
- 启动容器
- 汇报状态
绑定流程详解
sequenceDiagram
participant S as 调度器
participant A as API Server
participant K as kubelet
Note over S,A: Scheduling Cycle
S->>A: Filter(过滤节点)
S->>A: Score(为节点打分)
Note over S,A: Binding Cycle
S->>A: PreBind(准备 PVC、卷等资源)
S->>A: Bind(更新 Pod.nodeName = "node1")
A->>K: Watch(发现 Pod 绑定到 node1)
K->>K: 创建容器
K->>K: 启动容器
K->>A: 汇报状态
S->>A: PostBind(清理临时资源)
PreBind 的典型应用
PreBind 阶段主要用于准备绑定所需的资源:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 示例:PreBind 确保 PVC 已绑定
func (p *MyPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// 检查 PVC 是否存在
pvc, err := p.handle.ClientSet().CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
return framework.NewStatus(framework.Error, "PVC not found")
}
// 确保卷已经绑定到节点
if pvc.Spec.VolumeName == "" {
return framework.NewStatus(framework.Error, "PVC not bound")
}
return framework.NewStatus(framework.Success, "")
}
|
典型场景:
- 确保 PVC 已经绑定到 PV
- 确保存储卷已经准备好
- 设置网络配置
- 预分配 IP 地址
Bind 的默认实现
Bind 阶段用于实际绑定(更新 API Server 中的 Pod 对象):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// 默认的 Bind 实现(简化版)
func (b *DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// 创建 Pod 的副本
podCopy := pod.DeepCopy()
// 更新 Pod 的 nodeName 字段
podCopy.Spec.NodeName = nodeName
// 调用 API Server 更新 Pod
_, err := b.handle.ClientSet().CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
return framework.NewStatus(framework.Success, "")
}
|
为什么需要自定义 Bind?
虽然大多数情况下使用默认的 Bind 即可,但在某些场景下需要自定义:
- 多集群调度:将 Pod 绑定到其他集群的节点
- 自定义调度结果存储:不使用 nodeName 字段,而是使用 annotation
- 延迟绑定:等待外部系统确认后再绑定
实现原理
本示例实现一个简单的调度器:
- Pod 包含标签
scheduler.k8sdev.jimyag.com/filter: <value>
- Node 包含相同的标签
- Pod 只会被调度到标签值匹配的 Node 上
项目结构
1
2
3
4
5
6
7
8
9
10
11
12
13
|
scheduler/
├── main.go # 入口文件,注册插件
├── internal/
│ └── custom.go # 插件实现
├── deploy/ # 部署配置
│ ├── configmap.yaml
│ ├── deployment.yaml
│ ├── serviceaccount.yaml
│ └── rolebinding.yaml
└── test/ # 测试用例
├── jimyag-scheduler.yaml
├── normal.yaml
└── node-label.sh
|
插件注册
main.go 中注册自定义插件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package main
import (
"os"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/metrics/prometheus/clientgo"
_ "k8s.io/component-base/metrics/prometheus/version"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
_ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
"github.com/jimyag/k8sdev/scheduler/internal"
)
func main() {
// Register custom plugins to the scheduler framework.
command := app.NewSchedulerCommand(
app.WithPlugin(internal.Name, internal.New),
)
code := cli.Run(command)
os.Exit(code)
}
|
关键点:
- 使用
app.NewSchedulerCommand 创建调度器命令
- 通过
app.WithPlugin 注册插件
- 插件名称和初始化函数在
internal 包中定义
插件实现
internal/custom.go 实现了 Filter 和 PostFilter 扩展点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package internal
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
// 插件名称
Name = "JimyagCustom"
// 插件状态存储的 key
FilterLabel = "scheduler.k8sdev.jimyag.com/filter"
)
var (
_ framework.Plugin = &JimyagCustom{}
_ framework.FilterPlugin = &JimyagCustom{}
_ framework.PostFilterPlugin = &JimyagCustom{}
)
// New 初始化一个插件并返回
func New(rawArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
klog.Infof("New Scheduling plugin: %s", Name)
c := JimyagCustom{
handle: h,
}
klog.Infof("%s plugin initialized", Name)
return &c, nil
}
// JimyagCustom 是一个实现了扩展点的示例插件
type JimyagCustom struct {
handle framework.Handle
}
// Name 返回插件名称
func (c *JimyagCustom) Name() string {
return Name
}
|
插件需要实现 framework.Plugin 接口,只需要一个 Name() 方法。
Filter 实现
Filter 扩展点用于过滤节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// Filter 过滤掉那些不满足要求的 node
// 如果 node 没有自定义的 label,则忽略
// 如果 node 有,必须和 pod 的 label 匹配
func (c *JimyagCustom) Filter(ctx context.Context, state framework.CycleState, pod *v1.Pod, nodeInfo framework.NodeInfo) *framework.Status {
node := nodeInfo.Node()
klog.Infof("Filter %s/%s/%s: start scheduler plugin", pod.Namespace, pod.Name, node.Name)
// 检查 Pod 是否有标签
podValue, found := pod.GetLabels()[FilterLabel]
if !found {
// 如果 Pod 不包含自定义的 filter 标签,则可以任意分配
klog.Infof("Filter %s/%s/%s: pod doesn't have '%s' label; can be scheduled",
pod.Namespace, pod.Name, node.Name, FilterLabel)
return framework.NewStatus(framework.Success, "")
}
// 检查 Node 是否有标签
nodeValue, found := node.GetLabels()[FilterLabel]
if !found {
// 如果 Node 没有自定义的标签,则不能被分配
klog.Infof("Filter %s/%s/%s: node doesn't have '%s' label; pod cannot be scheduled here",
pod.Namespace, pod.Name, node.Name, FilterLabel)
return framework.NewStatus(framework.Unschedulable, "node does not have required label")
}
// 检查标签值是否匹配
if nodeValue != podValue {
klog.Infof("Filter %s/%s/%s: node '%s' label value %s does not match pod label value (%s)",
pod.Namespace, pod.Name, node.Name, FilterLabel, nodeValue, podValue)
return framework.NewStatus(framework.Unschedulable, "nodeGroup of pod and node don't match")
}
klog.Infof("Filter %s/%s/%s: finished, node '%s' : values match %s",
pod.Namespace, pod.Name, node.Name, FilterLabel, nodeValue)
return framework.NewStatus(framework.Success, "")
}
|
Filter 逻辑:
- 如果 Pod 没有指定标签,则可以被调度到任意节点
- 如果 Pod 有标签,但 Node 没有标签,则不能调度
- 如果都有标签,则值必须匹配
返回值:
framework.Success:节点通过过滤
framework.Unschedulable:节点不满足条件
PostFilter 实现
PostFilter 在所有节点都被过滤掉时执行:
1
2
3
4
5
6
|
// PostFilter 如果 Filter 阶段之后,所有 nodes 都被筛掉了,才会执行这个阶段
func (c *JimyagCustom) PostFilter(ctx context.Context, state framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
klog.Infof("PostFilter %s/%s: start", pod.Namespace, pod.Name)
klog.Infof("PostFilter %s/%s: finish", pod.Namespace, pod.Name)
return nil, framework.NewStatus(framework.Success, "")
}
|
PostFilter 可以用于:
- 记录日志
- 尝试抢占低优先级的 Pod
- 返回更详细的错误信息
部署调度器
配置调度器
configmap.yaml 定义调度器配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
apiVersion: v1
kind: ConfigMap
metadata:
name: jimyag-scheduler-config
namespace: kube-system
data:
jimyag-scheduler-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: jimyag-scheduler
plugins:
filter:
enabled:
- name: JimyagCustom
postFilter:
enabled:
- name: JimyagCustom
leaderElection:
leaderElect: false
|
关键配置:
schedulerName: jimyag-scheduler:调度器名称,Pod 需要通过 schedulerName 指定
plugins.filter.enabled:启用 Filter 扩展点的插件
leaderElection.leaderElect: false:单实例部署,禁用选举
部署资源
需要的资源:
1
2
3
4
5
6
7
|
scheduler/deploy/
├── configmap.yaml # 调度器配置
├── deployment.yaml # 调度器 Deployment
├── serviceaccount.yaml # ServiceAccount
├── rolebinding.yaml # RoleBinding
├── kube-scheduler-clusterrolebinding.yaml # ClusterRoleBinding
└── volume-scheduler-clusterrolebinding.yaml # Volume ClusterRoleBinding
|
部署命令:
1
2
3
|
# 修改 deployment.yaml 中的镜像地址
# 然后在 deploy 目录执行
kubectl apply -f ./
|
使用调度器
为节点打标签
首先为节点打标签:
1
2
|
# scheduler/test/node-label.sh
kubectl label nodes k8s-worker2 scheduler.k8sdev.jimyag.com/filter=normal
|
创建使用自定义调度器的 Pod
创建 Deployment,指定 schedulerName 和标签:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
apiVersion: apps/v1
kind: Deployment
metadata:
name: jimyag-scheduler-normal
spec:
replicas: 2
selector:
matchLabels:
scheduler.k8sdev.jimyag.com/filter: normal
template:
metadata:
labels:
scheduler.k8sdev.jimyag.com/filter: normal
spec:
schedulerName: jimyag-scheduler # 指定调度器
containers:
- image: nginx:1.7.9
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "50m"
memory: "64Mi"
limits:
cpu: "100m"
memory: "128Mi"
|
关键点:
schedulerName: jimyag-scheduler:指定使用自定义调度器
- Pod 标签
scheduler.k8sdev.jimyag.com/filter: normal:会被调度到有相同标签的节点
测试场景
-
Pod 有标签,Node 有匹配标签:
-
Pod 有标签,Node 没有匹配标签:
-
Pod 没有标签:
日志查看
查看调度器日志:
1
|
kubectl logs -n kube-system -l component=jimyag-scheduler
|
日志示例:
1
2
|
Filter default/jimyag-scheduler-normal-xxx/k8s-worker2: start scheduler plugin
Filter default/jimyag-scheduler-normal-xxx/k8s-worker2: finished, node 'scheduler.k8sdev.jimyag.com/filter' : values match normal
|
注意事项
-
调度器名称:
schedulerName 必须唯一
- Pod 必须通过
schedulerName 指定调度器
-
权限配置:
- 调度器需要 ClusterRole 访问 Node、Pod 等资源
- 需要配置 ServiceAccount 和 RoleBinding
-
高可用:
- 生产环境建议启用
leaderElection
- 部署多个副本,实现高可用
-
性能影响:
- Filter 插件会被频繁调用,注意性能
- 避免在 Filter 中进行耗时的操作
-
调试:
与原生调度器的关系
自定义调度器与默认调度器可以共存:
- 默认调度器:
schedulerName: default-scheduler(或不指定)
- 自定义调度器:
schedulerName: jimyag-scheduler
Pod 通过 schedulerName 选择使用哪个调度器。
总结
基于 Scheduler Framework 实现自定义调度器:
-
优点:
- 无需修改 Kubernetes 源码
- 插件化设计,灵活可扩展
- 与默认调度器共存
-
适用场景:
- 特定应用需要特定调度策略
- 实现更复杂的资源匹配规则
- 添加自定义的过滤或打分逻辑
本文示例实现了一个简单的标签过滤调度器,完整代码参考 GitHub: jimyag/k8sdev/scheduler。
参考资料