ᕕ( ᐛ )ᕗ Jimyag's Blog

client-go Informer、Reflector、Indexer、Workqueue 介绍

· 525 字 · 约 3 分钟

在基于 Kubernetes client-go 开发控制器(Controller)或 Operator 时,Informer、Reflector、Indexer、Workqueue 是常用的核心组件,分别负责资源监听、本地缓存、事件排队与处理。下文基于 client-go cacheworkqueue 包说明各组件的定义、作用、数据流,并给出官方文档入口;文末附数据流 Mermaid 图。

一、核心组件定义与作用

1. Reflector(反射器)

位置:k8s.io/client-go/tools/cache

职责:对 API Server 执行 ListAndWatch——先全量 List 拉取资源,再通过 Watch 长连接监听增量事件(Add/Update/Delete);将事件包装为 Delta(对象 + 操作类型),写入 DeltaFIFO(Delta 先入先出队列)。同时处理 Watch 断开重连、以及可选的定时 Resync 全量同步。

作用:屏蔽与 API Server 的通信细节,为上层提供稳定的事件流,是 Informer 的“数据入口”。

2. Indexer(索引缓存)

位置:k8s.io/client-go/tools/cache

职责:线程安全的本地内存缓存,保存资源对象的当前状态;在通用 Store 的 Get/List 基础上,支持自定义索引(如按 Namespace、NodeName、Label 等),便于按索引键做 O(1) 级查询。缓存与 API Server/etcd 侧数据通过 ListAndWatch 保持同步,减少直接请求 API。

关键接口:Indexer 继承 Store,提供 Add/Update/Delete/List/Get,以及 IndexKeys、Index、AddIndexers 等索引方法。

3. Informer(通知器)

位置:k8s.io/client-go/tools/cache

职责:把 Reflector、DeltaFIFO、Indexer 和内部处理循环组合成完整链路。内部循环从 DeltaFIFO 中 Pop Delta,根据操作类型更新 Indexer,并调用已注册的 ResourceEventHandler(AddFunc/UpdateFunc/DeleteFunc)。SharedInformer 允许多个“逻辑控制器”共享同一套 ListAndWatch 与缓存,进一步降低对 API Server 的压力。

注意:日常说的“控制器”通常指业务侧的 Controller(例如 Reconcile 循环);而 cache 包里的 Controller 接口指的是驱动上述“从 DeltaFIFO 消费并更新 Indexer、触发回调”的内部控制循环,二者不同。

作用:封装 ListAndWatch、缓存更新与事件分发,开发者只需注册事件回调即可收到资源变更。

4. Workqueue(工作队列)

位置:k8s.io/client-go/util/workqueue

职责:接收由 Informer 事件回调产生的“资源 Key”(如 namespace/name),对 Key 做异步、限流、重试与去重。支持多种实现:普通队列、延迟队列、速率限制队列(RateLimitingQueue)。同一 Key 在同一时刻通常只由一个 Worker 处理,保证串行;处理失败时可重新入队并受速率限制,避免雪崩。

作用:把“事件通知”与“业务处理”解耦,是控制器实现“异步 Reconcile”的常用手段。

二、数据流与协作关系

整体顺序可对应图中 1~9 步:

  1. Reflector 对 apiserver 执行 List & Watch,拉取全量并监听增量。
  2. Reflector 将事件包装为 Delta,写入 Delta Fifo queue。
  3. Informer 从 Delta Fifo queue Pop 出 Delta。
  4. Informer 根据 Delta 更新 Indexer(写入/更新/删除缓存项)。
  5. Indexer 将对象与 Key 存入底层 Thread Safe Store(线程安全存储)。
  6. Informer 将事件分发给已注册的 Res Event Handlers(ResourceEventHandler 回调)。
  7. Res Event Handlers 将资源 Key(如 namespace/name)入队 Workqueue。
  8. Process Item 从 Workqueue 取 Key,交给 Handle Object。
  9. Handle Object 通过 Indexer reference 用 Key 从 Indexer 取回完整对象,执行业务逻辑(Reconcile);成功则 Done 出队,失败则视情况重新入队重试。

下面用 Mermaid 图表示上述数据流,与常见 client-go 架构图对齐:上为 Client-go(apiserver、Reflector、DeltaFIFO、Informer、Indexer、Thread Safe Store、Res Event Handlers reference),下为 Custom Controller(Res Event Handlers、Workqueue、Indexer reference、Process Item、Handle Object);虚线表示跨边界的引用或调用。

  flowchart TB
    subgraph ClientGo["Client-go"]
        API[apiserver]
        R[Reflector]
        D[Delta Fifo queue]
        I[Informer]
        IDX[Indexer]
        TSS[Thread Safe Store]
        REF_H[Res Event Handlers reference]
    end

    subgraph CustomCtrl["Custom Controller"]
        H[Res Event Handlers]
        W[Workqueue]
        REF_IDX[Indexer reference]
        PI[Process Item]
        HO[Handle Object]
    end

    R <-->|1. List & Watch| API
    R -->|2. Add Object| D
    D -->|3. Pop Object| I
    I -->|4. Add Object| IDX
    IDX -->|5. Store Object & Key| TSS
    I -->|6. Dispatch Event Handler functions| REF_H
    REF_H -.-> H
    H -->|7. Enqueue Object Key| W
    W -->|8. Get Key| PI
    PI --> HO
    REF_IDX -.-> IDX
    REF_IDX -->|9. Get Object for Key| HO

关于“Get 是否经过 Indexer”:控制器在 Handle Object 里按 Key 取对象时,通常通过 Lister(例如 podLister.Pods(namespace).Get(name))获取;Lister 背后就是 Informer 的 Indexer,因此这次 Get 走的是本地缓存,不请求 API Server。若改用 ClientSet 直接调 API(例如 clientset.CoreV1().Pods(namespace).Get(ctx, name, opts)),则不会经过 Indexer,而是直接请求 API Server。

三、关键接口速览

cache 包

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Reflector 将 ListAndWatch 的结果写入 DeltaFIFO
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector

// Indexer 在 Store 基础上增加按索引查询
type Indexer interface {
    Store
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ListIndexFuncValues(indexName string) []string
    AddIndexers(newIndexers Indexers) error
}

// Controller 驱动 DeltaFIFO 消费与事件分发,Run 后阻塞直到 stopCh 关闭
type Controller interface {
    Run(stopCh <-chan struct{})
    HasSynced() bool
    LastSyncResourceVersion() string
}

workqueue 包

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 基础队列接口
type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

// 速率限制队列(常用):在延迟队列基础上增加按 Key 限速与重试计数
type RateLimitingInterface interface {
    DelayingInterface
    AddRateLimited(item interface{})
    Forget(item interface{})
    NumRequeues(item interface{}) int
}

四、官方文档与示例入口

五、小结

组件 作用
Reflector 从 API Server ListAndWatch,将事件以 Delta 写入 DeltaFIFO。
Indexer 带索引的本地缓存,与 API 数据同步,支持按索引快速查询。
Informer 整合 Reflector、DeltaFIFO、Indexer 与事件分发,提供事件回调和缓存访问;SharedInformer 支持多控制器共享。
Workqueue 对事件产生的 Key 做异步、限流、重试与去重,支撑控制器的 Reconcile 循环。

参考链接:

#Kubernetes #Client-Go #Informer #Controller