ᕕ( ᐛ )ᕗ Jimyag's Blog

Go chan 底层实现原理全解析

· 3004 words · ~ 15 min read

Last modified:

Go 的 chan 是日常并发代码里最常见的同步原语。它表面上只有发送、接收、关闭三个动作,但底层同时涉及内存分配、环形队列、goroutine 阻塞唤醒、栈拷贝保护、race detector、select 公平性等一整套 runtime 机制。

本文基于 Go 1.26.3 的 runtime 源码,重点分析 runtime/chan.goruntime/runtime2.goruntime/select.go,从数据结构开始,完整梳理 make(chan)ch <- x<-chclose(ch)select 的执行路径,并总结实际开发中最容易踩坑的行为边界。


数据结构

Go 运行时里 channel 对应的是 runtime/chan.go 中的 hchan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// runtime/chan.go
type hchan struct {
    qcount   uint           // 当前缓冲区里的元素数量
    dataqsiz uint           // 环形缓冲区容量
    buf      unsafe.Pointer // 指向 dataqsiz 个元素的数组
    elemsize uint16         // 元素大小
    closed   uint32         // 是否已关闭
    timer    *timer         // timer channel 使用
    elemtype *_type         // 元素类型
    sendx    uint           // 下一个发送写入位置
    recvx    uint           // 下一个接收读取位置
    recvq    waitq          // 等待接收的 goroutine 队列
    sendq    waitq          // 等待发送的 goroutine 队列
    bubble   *synctestBubble
    lock     mutex
}

chan 变量本身不是 hchan,而是指向 hchan 的指针。也就是说:

1
ch := make(chan int, 3)

可以理解为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ch 变量(栈上/堆上)          hchan(堆上)
┌──────────────┐              ┌──────────────────────┐
│ *hchan ──────┼────────────► │ qcount = 0           │
└──────────────┘              │ dataqsiz = 3         │
                              │ buf ─────────────┐   │
                              │ sendx = 0        │   │
                              │ recvx = 0        │   │
                              │ recvq = nil      │   │
                              │ sendq = nil      │   │
                              │ lock             │   │
                              └──────────────────┼───┘
                                      环形缓冲区 [ _ ][ _ ][ _ ]

这点和 slice 很不一样。slice 变量是三字段描述符,拷贝 slice 会拷贝 array/len/cap;channel 变量是一个指针,拷贝 channel 只是拷贝同一个 hchan 指针。多个变量指向同一个 channel 时,底层队列、关闭状态、等待队列完全共享。

hchan 里的 timer *timer 只对 timer channel 有意义,例如 time.NewTimertime.Aftertime.NewTicker 背后的 channel。runtime 用它把 channel 和定时器系统连起来,这样 timer 到期时可以直接驱动 channel 变为可接收,而不需要每个 timer 额外绑一个 goroutine。普通业务代码里自己 make(chan T) 创建的 channel,这个字段始终是 nil

waitq 与 sudog

recvqsendq 的类型都是 waitq,本质是一个双向链表:

1
2
3
4
type waitq struct {
    first *sudog
    last  *sudog
}

链表节点不是 goroutine 本身,而是 sudogsudog 是 runtime 用来表达“某个 goroutine 正在某个同步对象上等待”的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// runtime/runtime2.go
type sudog struct {
    g       *g
    next    *sudog
    prev    *sudog
    elem    maybeTraceablePtr // 发送值地址或接收目标地址,可能指向栈
    isSelect bool
    success  bool
    waitlink *sudog
    c        maybeTraceableChan
}

一个 goroutine 可能同时等待多个同步对象,例如一个包含多个 case 的 select。因此 runtime 没有直接把 g 挂到 channel 队列上,而是通过 sudog 建立多对多关系。

1
2
3
4
5
6
hchan.recvq
┌───────┐     ┌────────────┐     ┌────────────┐
│ first ┼────►│ sudog(g1)  ┼────►│ sudog(g2)  │
│ last  ┼──┐  │ elem=&dst1 │     │ elem=&dst2 │
└───────┘  │  └────────────┘     └────────────┘
           └──────────────────────────▲

核心不变量

chan.go 文件开头直接写了 channel 的关键不变量:

  • sendqrecvq 至少有一个为空。
  • 例外是无缓冲 channel 配合 select 时,同一个 goroutine 可能同时挂在发送和接收队列上。
  • 对有缓冲 channel:qcount > 0 表示 recvq 为空。
  • 对有缓冲 channel:qcount < dataqsiz 表示 sendq 为空。

换成人话:

  • 如果缓冲区里有数据,就不应该还有接收者在等。
  • 如果缓冲区还有空位,就不应该还有发送者在等。
  • 有缓冲 channel 的发送者只有在缓冲区满时才会阻塞。
  • 有缓冲 channel 的接收者只有在缓冲区空时才会阻塞。

这些不变量让 send/recv 可以快速判断走“直接交付”“读写缓冲区”还是“阻塞等待”。


方向类型

Go 语法里有三种 channel 类型:chan Tchan<- T<-chan T。它们在 runtime 里并不是三套不同的数据结构,底层都还是同一个 *hchan;方向限制完全由编译器和类型系统保证。

1
2
3
var both chan int
var sendOnly chan<- int = both
var recvOnly <-chan int = both

这也是为什么把双向 channel 传给不同函数后,所有方看到的仍然是同一个底层队列、同一个关闭状态和同一组等待队列。差别只在“这段代码被允许做什么”:chan<- T 只能发不能收,<-chan T 只能收不能发;反过来,不能把一个只收 channel 当成双向 channel 用。

方向类型的价值主要在 API 边界上表达意图。生产者返回 <-chan T,调用方一看就知道“只能读结果”;消费者函数参数写成 chan<- T,就能明确“这里只负责投递”。close 也只允许发生在发送侧:对 <-chan Tclose 会在编译期报错。


创建 channel

make

1
ch := make(chan int, 3)

编译器最终会调用 runtime 的 makechan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func makechan(t *chantype, size int) *hchan {
    elem := t.Elem
    mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
    case !elem.Pointers():
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.Size_)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)
    return c
}

创建过程可以拆成四步:

  flowchart TD
    A["make(chan T, n)"] --> B["检查元素大小、对齐、容量溢出"]
    B --> C{"n * sizeof(T) == 0?"}
    C -->|是| D["只分配 hchan\nbuf 指向 raceaddr"]
    C -->|否| E{"T 是否包含指针?"}
    E -->|不包含指针| F["hchan + buf 一次 mallocgc\nGC 不扫描缓冲区"]
    E -->|包含指针| G["hchan 和 buf 分开分配\nbuf 按元素类型让 GC 扫描"]
    D --> H["初始化 elemsize/elemtype/dataqsiz/lock"]
    F --> H
    G --> H

这里有一个性能细节:如果元素类型不包含指针,runtime 会把 hchan 和缓冲区放在同一块内存里,并且这块内存对 GC 来说“不包含需要扫描的指针”。如果元素类型包含指针,缓冲区必须带类型信息分配,让 GC 能正确扫描其中的指针。

无缓冲 channel

1
ch := make(chan int)

无缓冲 channel 的 dataqsiz = 0,没有真正的数据缓冲区。发送和接收必须配对完成:

1
2
3
sender goroutine                 receiver goroutine
ch <- x  ───────────────────────► v := <-ch
          直接从 sender 栈拷贝到 receiver 目标地址

无缓冲 channel 的核心语义是同步交接,而不是排队存储。

有缓冲 channel

1
ch := make(chan int, 3)

有缓冲 channel 的 buf 是一个环形队列:

1
2
3
4
5
buf:    [ A ][ B ][ _ ]
         ▲         ▲
       recvx=0   sendx=2
qcount=2
dataqsiz=3

发送写入 sendx,接收读取 recvx。索引到达 dataqsiz 后回到 0:

1
2
3
4
c.sendx++
if c.sendx == c.dataqsiz {
    c.sendx = 0
}

这就是固定容量环形缓冲区,channel 不会像 slice 一样扩容。容量在 make 时确定,之后不会变化。


发送操作

1
ch <- x

编译器会把普通发送编译到 runtime.chansend1,再进入 chansend

1
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

其中:

  • c 是 channel 对应的 hchan
  • ep 是要发送的元素地址。
  • block 表示能否阻塞;普通发送是 trueselect { case ch <- x: default: }false

整体流程:

  flowchart TD
    A["ch <- x"] --> B{"ch == nil?"}
    B -->|是,阻塞发送| C["gopark: 永久阻塞"]
    B -->|否| D{"非阻塞且 channel 满?"}
    D -->|是| E["返回 false"]
    D -->|否| F["加锁 c.lock"]
    F --> G{"closed != 0?"}
    G -->|是| H["panic: send on closed channel"]
    G -->|否| I{"recvq 有等待接收者?"}
    I -->|是| J["直接把值拷贝给接收者\n绕过缓冲区"]
    I -->|否| K{"缓冲区有空位?"}
    K -->|是| L["写入 buf[sendx]\nsendx 前进,qcount++"]
    K -->|否| M{"允许阻塞?"}
    M -->|否| N["解锁,返回 false"]
    M -->|是| O["创建 sudog\n挂入 sendq\ngopark 阻塞"]

发送到等待接收者

如果 recvq 里已经有接收者,发送方不会先写入缓冲区,而是直接把值交给接收方:

1
2
3
4
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

send 内部会调用 sendDirect

1
2
3
4
5
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem.get()
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
    memmove(dst, src, t.Size_)
}

这里的 src 是发送方的值地址,dst 是接收方提供的目标地址。对无缓冲 channel 来说,这就是一次 goroutine 到 goroutine 的直接数据拷贝。

需要注意:这不是传引用。channel 发送的是元素值拷贝。如果元素本身是指针、slice、map、interface,那么被拷贝的是这些值里的指针或描述符。

发送到缓冲区

如果没有等待接收者,并且缓冲区有空位,就写入环形队列:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

示例:

1
2
3
ch := make(chan int, 3)
ch <- 10
ch <- 20
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
初始:
buf    [ _ ][ _ ][ _ ]
sendx    ^
recvx    ^
qcount = 0

发送 10:
buf    [10 ][ _ ][ _ ]
recvx    ^
sendx         ^
qcount = 1

发送 20:
buf    [10 ][20 ][ _ ]
recvx    ^
sendx              ^
qcount = 2

发送阻塞

如果没有等待接收者,缓冲区也满了,普通发送会阻塞:

1
2
3
4
5
6
7
8
mysg := acquireSudog()
mysg.elem.set(ep)
mysg.g = gp
mysg.c.set(c)
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)

阻塞发送的关键点:

  • runtime 从 sudog 池拿一个节点。
  • mysg.elem 保存发送值地址。
  • mysg.g 指向当前 goroutine。
  • mysg.c 指向当前 channel。
  • 节点进入 sendq
  • 当前 goroutine 通过 gopark 让出执行权。

之后某个接收者会完成这次发送,并通过 goready 唤醒发送方。


接收操作

1
2
v := <-ch
v, ok := <-ch

编译器会分别进入:

1
2
func chanrecv1(c *hchan, elem unsafe.Pointer)
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool)

最终都调用:

1
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

其中:

  • selected 表示这次非阻塞接收是否选中。
  • received 对应 v, ok := <-ch 中的 ok
  • ep 是接收目标地址;如果只是 <-ch 丢弃结果,ep 可以是 nil。

整体流程:

  flowchart TD
    A["<-ch"] --> B{"ch == nil?"}
    B -->|是,阻塞接收| C["gopark: 永久阻塞"]
    B -->|否| D{"非阻塞且暂时不可接收?"}
    D -->|是| E["返回 selected=false"]
    D -->|否| F["加锁 c.lock"]
    F --> G{"closed 且 qcount == 0?"}
    G -->|是| H["写入零值\n返回 ok=false"]
    G -->|否| I{"sendq 有等待发送者?"}
    I -->|是| J["从发送者直接接收\n或在满缓冲区中交换"]
    I -->|否| K{"qcount > 0?"}
    K -->|是| L["读取 buf[recvx]\n清空槽位,recvx 前进,qcount--"]
    K -->|否| M{"允许阻塞?"}
    M -->|否| N["解锁,返回 false,false"]
    M -->|是| O["创建 sudog\n挂入 recvq\ngopark 阻塞"]

从缓冲区接收

缓冲区有数据时,接收方直接读取 buf[recvx]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
if c.qcount > 0 {
    qp := chanbuf(c, c.recvx)
    if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
}

读完之后 runtime 会清空槽位。对包含指针的元素类型来说,这一步很重要:否则缓冲区槽位会继续持有旧指针,影响 GC 回收。

从等待发送者接收

如果 channel 没有关闭,并且 sendq 里有等待发送者,接收方会优先完成等待发送者:

1
2
3
4
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
}

这里分两种情况。

无缓冲 channel:直接从发送方 sudog.elem 拷贝到接收方目标地址。

1
sender sudog.elem ─────► receiver ep

有缓冲 channel:能出现 sendq 有等待发送者,说明缓冲区一定是满的。接收方会取出 recvx 位置的旧值,同时把等待发送者的值放入同一个槽位:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
满缓冲区:
buf:   [ A ][ B ][ C ]
      recvx/sendx

接收者拿走 A,等待发送者补入 X:
buf:   [ X ][ B ][ C ]
           recvx/sendx
qcount 不变

源码里对应逻辑:

1
2
3
4
5
6
7
8
qp := chanbuf(c, c.recvx)
typedmemmove(c.elemtype, ep, qp)          // 队头给接收者
typedmemmove(c.elemtype, qp, sg.elem.get()) // 等待发送者补到队尾
c.recvx++
if c.recvx == c.dataqsiz {
    c.recvx = 0
}
c.sendx = c.recvx

因为缓冲区满时 sendx == recvx,所以同一个槽位既是队头,也是即将写入的队尾。这个设计避免了额外移动数据。

接收阻塞

如果没有等待发送者,缓冲区也没有数据,普通接收会阻塞:

1
2
3
4
5
6
7
mysg := acquireSudog()
mysg.elem.set(ep)
mysg.g = gp
mysg.c.set(c)
gp.waiting = mysg
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)

之后某个发送者会把值写入 mysg.elem 指向的地址,并唤醒接收方。


关闭 channel

1
close(ch)

关闭操作进入 closechan

1
func closechan(c *hchan)

流程:

  flowchart TD
    A["close(ch)"] --> B{"ch == nil?"}
    B -->|是| C["panic: close of nil channel"]
    B -->|否| D["加锁 c.lock"]
    D --> E{"closed != 0?"}
    E -->|是| F["panic: close of closed channel"]
    E -->|否| G["closed = 1"]
    G --> H["释放所有 recvq 等待者\n写入零值,ok=false"]
    H --> I["释放所有 sendq 等待者\n唤醒后 panic"]
    I --> J["解锁"]
    J --> K["goready 唤醒所有等待 goroutine"]

关键源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
c.closed = 1

// release all readers
for {
    sg := c.recvq.dequeue()
    if sg == nil {
        break
    }
    if sg.elem.get() != nil {
        typedmemclr(c.elemtype, sg.elem.get())
        sg.elem.set(nil)
    }
    sg.success = false
    glist.push(sg.g)
}

// release all writers (they will panic)
for {
    sg := c.sendq.dequeue()
    if sg == nil {
        break
    }
    sg.elem.set(nil)
    sg.success = false
    glist.push(sg.g)
}

关闭后的行为分几类:

操作 行为
close(nil) panic
重复 close(ch) panic
向已关闭 channel 发送 panic
从已关闭且已空的 channel 接收 返回元素零值,ok=false
从已关闭但仍有缓冲数据的 channel 接收 继续返回缓冲数据,ok=true
从 nil channel 发送/接收 永久阻塞

示例:

1
2
3
4
5
6
7
8
9
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)

fmt.Println(<-ch)      // 1
fmt.Println(<-ch)      // 2
v, ok := <-ch
fmt.Println(v, ok)     // 0 false

close 不会清空缓冲区。它只是标记 channel 已关闭,并唤醒已经阻塞的接收者和发送者。缓冲区里的历史数据仍然可以被正常读完。


nil channel

nil channel 是没有 hchan 的 channel:

1
var ch chan int

它和关闭的 channel 完全不同:

操作 nil channel closed channel
发送 永久阻塞 panic
接收 永久阻塞 零值,ok=false,或先读完缓冲数据
关闭 panic 重复关闭 panic
len 0 剩余缓冲元素数量
cap 0 channel 容量

runtime 中的发送和接收开头都有 nil 判断:

1
2
3
4
5
6
7
if c == nil {
    if !block {
        return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
    throw("unreachable")
}

这也是一个常用技巧:在 select 里把某个 channel 设置为 nil,可以临时禁用对应 case。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
var out chan<- int
if ready {
    out = ch
}

select {
case out <- v:
    // ready 时才可能发送
case <-ctx.Done():
    return
}

内存模型保证

channel 不只是队列,也是 Go 内存模型里最重要的同步原语之一。理解它的 happens-before 规则,很多“为什么这样写不会乱序”的问题就清楚了。

规则 含义
send -> 对应 receive 一次发送先于对应接收完成
unbuffered receive -> send 完成 无缓冲 channel 上,接收方真正接住数据后,发送方才算完成
close -> 返回零值的 receive close(ch) 先于之后因关闭而返回零值的接收
k 个 receive -> 第 k+C 个 send 容量为 C 的缓冲 channel 可当计数信号量理解

最常见的是第一条:发送前对共享数据做的写入,在对应接收完成后对接收方可见。无缓冲 channel 还更强一点,因为发送和接收是同步交接,发送方在接收方真正接住数据前不会继续往后跑。

close 规则也很实用。一个 goroutine 关闭 channel 后,其他 goroutine 观察到 ok == false,不仅表示“没有更多值了”,也意味着它同步观察到了关闭动作之前已经完成的写入。带缓冲 channel 的第 k 条规则则解释了为什么“容量为 N 的 channel 做信号量”在并发控制里是成立的。


select 的实现

select 的实现位于 runtime/select.go,核心函数是:

1
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

编译器会做一些提前优化。比如只有一个 channel case 加 default 的 select:

1
2
3
4
5
6
select {
case ch <- v:
    foo()
default:
    bar()
}

会被改写为非阻塞发送:

1
2
3
4
5
if selectnbsend(ch, v) {
    foo()
} else {
    bar()
}

而通用 selectselectgo,主要分三轮:

  flowchart TD
    A["进入 selectgo"] --> B["生成随机 pollorder\n避免固定 case 饥饿"]
    B --> C["按 hchan 地址排序 lockorder\n避免多 channel 加锁死锁"]
    C --> D["按 lockorder 给所有 channel 加锁"]
    D --> E["pass 1: 按 pollorder 查找立即可执行 case"]
    E --> F{"找到可执行 case?"}
    F -->|是| G["执行 send/recv/close/default 路径\n解锁返回"]
    F -->|否,且有 default| H["解锁,返回 default"]
    F -->|否,且需要阻塞| I["pass 2: 为每个 case 创建 sudog\n挂入对应 sendq/recvq"]
    I --> J["gopark 阻塞"]
    J --> K["被某个 case 唤醒"]
    K --> L["重新加锁"]
    L --> M["pass 3: 从未命中的队列移除 sudog"]
    M --> N["返回命中的 case"]

两个细节很重要。

第一,pollorder 是随机排列的。多个 case 同时可执行时,runtime 不按源码顺序固定选择,而是用伪随机顺序轮询,减少饥饿。

第二,lockorder 按 channel 地址排序。一个 select 可能涉及多个 channel,如果不同 goroutine 用不同顺序加锁,就可能死锁。runtime 统一按地址排序后再加锁,保证加锁顺序一致。

select 与 sudog 竞争

select 阻塞时,一个 goroutine 会为每个 channel case 都创建一个 sudog,分别挂到不同 channel 的等待队列上:

1
2
3
4
5
goroutine G 正在 select:

case <-ch1:  ch1.recvq -> sudog(G)
case ch2<-x: ch2.sendq -> sudog(G)
case <-ch3:  ch3.recvq -> sudog(G)

一旦其中一个 case 被唤醒,其它 channel 队列上的 sudog 都要移除。源码里称为:

1
// pass 3 - dequeue from unsuccessful chans

为了处理“多个 channel 几乎同时唤醒同一个 select”的竞争,sudog.isSelect 会配合 g.selectDone 做 CAS,确保只有一个 case 赢。


内存与调度细节

为什么 channel 需要锁

hchan.lock 保护这些内容:

  • qcount
  • sendx
  • recvx
  • closed
  • sendq
  • recvq
  • 阻塞在该 channel 上的部分 sudog 字段

channel 的 send/recv/close 都会修改共享状态,所以需要互斥锁。这里的锁是 runtime 自己的 mutex,不是 sync.Mutex

无竞争场景下,channel 操作仍然不等于零成本。即使缓冲区有空位,发送也要进入 runtime、加锁、拷贝元素、更新索引、解锁。channel 是同步原语,不是普通队列的无锁替代品。

阻塞为什么要关心栈拷贝

阻塞发送和阻塞接收时,sudog.elem 可能指向 goroutine 栈上的变量。例如:

1
2
v := 10
ch <- v

发送方阻塞时,sudog.elem 可能指向 v 的栈地址。Go 的 goroutine 栈可以增长和收缩,runtime 必须避免某个 sudog 持有的栈地址在栈拷贝过程中失效。

所以 channel 阻塞路径里有这类状态:

1
2
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)

chanparkcommit 会设置 gp.activeStackChans = true,告诉栈拷贝逻辑:这个 goroutine 有 channel sudog 指向栈,处理栈时要拿对应 channel 锁保护。

这也是 channel 实现比“一个队列加条件变量”更复杂的原因之一。

len 和 cap

len(ch)cap(ch) 对应 runtime 的 chanlenchancap

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func chanlen(c *hchan) int {
    if c == nil {
        return 0
    }
    return int(c.qcount)
}

func chancap(c *hchan) int {
    if c == nil {
        return 0
    }
    return int(c.dataqsiz)
}

对普通 channel:

  • len(ch) 是当前缓冲区中还没被接收的元素数量。
  • cap(ch) 是创建时指定的容量。
  • 无缓冲 channel 的 cap 是 0。

不要用 len(ch) 做并发正确性判断。它只是某一瞬间的观察,返回后状态可能立刻被其它 goroutine 改变。它适合监控、调试、粗略指标,不适合作为“接下来一定可以发送/接收”的依据。


常见陷阱

误以为 channel 发送的是引用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type User struct {
    Name string
}

ch := make(chan User, 1)
u := User{Name: "alice"}
ch <- u
u.Name = "bob"

fmt.Println((<-ch).Name) // alice

channel 发送的是值拷贝。这里缓冲区里保存的是发送时的 User 副本。

但如果发送的是指针,复制的就是指针值:

1
2
3
4
5
6
ch := make(chan *User, 1)
u := &User{Name: "alice"}
ch <- u
u.Name = "bob"

fmt.Println((<-ch).Name) // bob

这不是 channel 特殊行为,而是 Go 值语义的自然结果。

向已关闭 channel 发送

1
2
close(ch)
ch <- 1 // panic: send on closed channel

关闭 channel 应该由发送方负责,通常是“唯一发送方”或“协调者”关闭。多个发送方并发关闭或发送,很容易出现 panic。

常见模式:

1
2
3
4
5
6
func producer(out chan<- int) {
    defer close(out)
    for i := 0; i < 10; i++ {
        out <- i
    }
}

如果有多个 producer,通常用 sync.WaitGroup 等所有 producer 退出后,由单独的 goroutine 关闭输出 channel:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
out := make(chan int)
var wg sync.WaitGroup

for i := 0; i < n; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        produce(out)
    }()
}

go func() {
    wg.Wait()
    close(out)
}()

从关闭 channel 读取时忘记 ok

1
v := <-ch

如果 ch 已关闭且已空,v 是零值。对 int 来说是 0,对指针来说是 nil。这可能和真实业务值混淆。

需要区分数据和关闭信号时,使用双返回值:

1
2
3
4
v, ok := <-ch
if !ok {
    return
}

for range ch 内部也依赖同样语义:一直接收直到 ok=false

1
2
3
for v := range ch {
    handle(v)
}

nil channel 导致永久阻塞

1
2
var ch chan int
ch <- 1 // 永久阻塞

nil channel 在普通代码中通常是 bug,在 select 中可以是工具。区别在于是否有意为之。

如果某个 goroutine 卡住,栈里看到:

1
goroutine 18 [chan send (nil chan)]:

优先检查 channel 是否忘记初始化。

用 channel 当无界队列

channel 容量固定,不会自动扩容。把 channel 当队列时,要明确背压策略:

  • 容量太小,发送方容易阻塞。
  • 容量太大,内存占用上升,延迟问题被隐藏。
  • 消费者退出但生产者还在发送,会导致 goroutine 泄漏。

需要无界队列时,通常应该显式实现队列和退出协议,而不是盲目把 channel 容量调大。

使用 time.After 造成临时对象堆积

在循环里直接写:

1
2
3
4
5
6
7
8
for {
    select {
    case <-time.After(time.Second):
        flush()
    case v := <-in:
        handle(v)
    }
}

每轮都会创建新的 timer 和 channel。高频循环里更推荐复用 time.Timertime.Ticker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
    select {
    case <-ticker.C:
        flush()
    case v := <-in:
        handle(v)
    }
}

这不是 hchan 本身的问题,但 timer channel 在 runtime 里有特殊处理,频繁创建仍然会带来额外成本。

认为 close 可以广播数据

close(ch) 广播的是“不会再有新值”,不是广播某个业务值。关闭后接收者拿到的是元素零值和 ok=false

如果需要广播取消,优先用 context.Context

1
2
3
4
5
6
select {
case <-ctx.Done():
    return ctx.Err()
case v := <-ch:
    handle(v)
}

如果只是内部同步,也可以用关闭 chan struct{} 表达一次性广播:

1
2
3
4
5
6
7
8
done := make(chan struct{})

go func() {
    defer close(done)
    work()
}()

<-done

这里接收者不关心值,只关心 channel 被关闭。


实践建议

容量表达语义,而不是拍脑袋调性能

1
2
jobs := make(chan Job)       // 同步交接,天然背压
events := make(chan Event, 64) // 允许短暂突发

无缓冲 channel 表达“发送方和接收方必须同步交接”。有缓冲 channel 表达“允许有限排队”。容量应该来自业务吞吐、突发窗口、内存预算,而不是随便写一个很大的数字。

close 只用于发送方向接收方声明结束

接收方通常不应该关闭输入 channel。因为接收方无法知道是否还有其它发送者准备发送。

推荐约定:

  • 谁创建生产流程,谁负责关闭输出。
  • 多发送方时,由协调者等待所有发送方结束后关闭。
  • 不用 close 表达“我不想再接收”,这种情况用 context 取消。

大对象避免直接按值传输

channel 每次发送都会拷贝元素。元素很大时,拷贝成本明显:

1
2
type Payload [4096]byte
ch := make(chan Payload, 1024)

这类代码会让每次发送/接收都移动较大的内存。更常见的做法是传指针或传小描述符:

1
ch := make(chan *Payload, 1024)

但传指针意味着共享可变对象,必须明确所有权:发送后生产者是否还能修改?消费者是否负责归还对象池?这些都要写清楚。

用 channel 管生命周期时必须有退出路径

容易泄漏的写法:

1
2
3
4
5
6
7
8
9
func start() chan int {
    out := make(chan int)
    go func() {
        for i := 0; ; i++ {
            out <- i
        }
    }()
    return out
}

如果调用方不再接收,goroutine 会永远阻塞在发送上。

更稳妥的写法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func start(ctx context.Context) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 0; ; i++ {
            select {
            case <-ctx.Done():
                return
            case out <- i:
            }
        }
    }()
    return out
}

不要依赖 len(ch) 做并发控制

错误示例:

1
2
3
if len(ch) < cap(ch) {
    ch <- v // 这里仍然可能阻塞
}

len(ch) 返回后,另一个 goroutine 可能已经填满 channel。需要非阻塞发送时,用 select default

1
2
3
4
5
6
select {
case ch <- v:
    return true
default:
    return false
}

需要限流时,用明确的 semaphore channel:

1
2
3
4
5
6
7
sem := make(chan struct{}, 16)

sem <- struct{}{}
go func() {
    defer func() { <-sem }()
    work()
}()

和 slice 的关键区别

维度 slice channel
变量本质 三字段描述符 指向 hchan 的指针
数据存储 底层数组 hchan.buf 环形队列
容量变化 append 可能扩容 make 后容量固定
并发安全 并发读写需外部同步 单次 send/recv/close 由 runtime 同步
复制变量 复制描述符,共享底层数组 复制指针,共享同一个 hchan
零值 nil slice 可 append nil channel 收发永久阻塞
关闭语义 无 close close 表示不再发送

理解 slice 的核心是“描述符 + 底层数组”。理解 channel 的核心是“hchan + 环形缓冲区 + 等待队列 + goroutine park/unpark”。


一张总图

  sequenceDiagram
    participant S as Sender
    participant C as hchan
    participant B as buf
    participant R as Receiver

    S->>C: chansend(c, &x, block=true)
    C->>C: lock(c.lock)
    alt recvq has waiter
        C->>R: sendDirect(&x, receiver.elem)
        C->>R: goready(receiver.g)
    else buffer has space
        C->>B: typedmemmove(buf[sendx], &x)
        C->>C: sendx++, qcount++
    else full
        C->>C: enqueue sudog into sendq
        S->>S: gopark()
    end
    C->>C: unlock(c.lock)
  sequenceDiagram
    participant R as Receiver
    participant C as hchan
    participant B as buf
    participant S as Sender

    R->>C: chanrecv(c, &v, block=true)
    C->>C: lock(c.lock)
    alt closed and empty
        C->>R: zero value, ok=false
    else sendq has waiter
        C->>S: recvDirect(sender.elem, &v)
        C->>S: goready(sender.g)
    else buffer has data
        C->>B: typedmemmove(&v, buf[recvx])
        C->>B: typedmemclr(buf[recvx])
        C->>C: recvx++, qcount--
    else empty
        C->>C: enqueue sudog into recvq
        R->>R: gopark()
    end
    C->>C: unlock(c.lock)

总结

chan 的底层不是魔法,而是一个由 runtime 严格保护的同步对象:

  • hchan 保存缓冲区、索引、关闭状态和等待队列。
  • 有缓冲 channel 用固定容量环形队列存放元素。
  • 无缓冲 channel 不存数据,发送和接收直接配对拷贝。
  • 阻塞 goroutine 会通过 sudog 挂到 sendqrecvq,再由 gopark 暂停。
  • 匹配的发送/接收会完成数据拷贝,并用 goready 唤醒对方。
  • close 会唤醒所有等待接收者和发送者,但只有接收者得到零值和 ok=false,发送者会 panic。
  • select 通过随机 poll 顺序和固定 lock 顺序兼顾公平性和死锁规避。

日常写 channel 代码时,最重要的是明确所有权和生命周期:谁发送,谁关闭,谁退出,缓冲区容量表达什么语义。只要这几个问题清楚,channel 代码通常就会简单很多。


参考资料

#Go #Channel #Chan #底层原理 #并发