Channel 可以说是 Go 语言最具特色的设计了,我们经常会看到一些老鸟这样教育菜鸟:
Do not communicate by sharing memory; instead, share memory by communicating.
那么熟练使用 Golang 就离不开 channel,有必要了解一下 channel 是怎么实现的。
channel 的源代码在 Golang 的 src/runtime/chan.go
目录下,先看结构体:
type hchan struct {
qcount uint // 循环列表元素个数
dataqsiz uint // 循环队列的大小
buf unsafe.Pointer // 循环队列的指针
elemsize uint16 // channel 中元素的大小
closed uint32 // 是否已close
elemtype *_type // channel 中元素类型
sendx uint // send 在buffer中的索引
recvx uint // recv 在buffer中的索引
recvq waitq // receiver 的等待队列
sendq waitq // sender 的等待队列
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
其中
- qcount 代表 channel 已经接收但还没被取走的元素的个数
- recvq 和 sendq 表示等待接收数据 和等待发送数据的 goroutine 队列 (用 sudog 结构表示)
举例来说,假设用make(chan int, 100)
创建了带100个缓冲区的 channel,qcount
代表缓冲区中元素个数,dataqsiz
= 100, recvq 代表等待读数据的 goroutine,而当缓冲区满了以后,sendq 代表阻塞的 goroutine
创建 channel
先看一个最简单的使用 channel 的例子:
func main() {
c := make(chan int, 123)
go func() {
c <- 1 // send to channel
}()
x := <-c // recv from channel
fmt.Println(x)
}
第一行创建channel 的语句实际执行的是 runtime.makechan()
,运行 dlv 并加断点,可以看到传入的 size 的确是 123。
=> 71: func makechan(t *chantype, size int) *hchan {
72: elem := t.elem
73:
(dlv) p size
123
makechan 函数的整体思想比较简单,主要是根据创建 chan 的类型和大小选择如何分配内存,
var c *hchan
switch {
case mem == 0:
// chan的size或者元素的size是0,不必创建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
case elem.ptrdata == 0:
// 元素不是指针,分配一块连续的内存给hchan数据结构和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 包含指针,那么由运行时负责单独分配
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
发送数据
向 channel 中发送数据最终调用的是 chansend
,而它的调用者是 runtime.chansend1()
。
那么怎么知道是 runtime 中的这个函数的呢? 一种方法是用 go tool compile -N -l -S test.go
把源代码编译成汇编代码形式,这样就能找到 runtime 开头的函数了
非阻塞 channel
chansend() 函数的整体逻辑还是很好理解的,但是其中参数 block bool
怎么理解呢?
我花了很多时间才明白:这里的 block 并不是指缓冲区满了以后 goroutine 被 block,而是根据用户把 select 和 channel 组合起来使用时,在 select 中表现出来的 block,换句话说,go 编译器决定了这个 block 参数时 true 还是 false
本文开头的例子里是 channel 的阻塞调用,那么非阻塞是什么样子的呢? 答案是: 配合 select default 的时候
ch := make(chan int)
select {
case ch <- 1:
// ...
default:
fmt.Println("do default")
}
比如上面这个语句,当执行到它的时候,如果此时无法向 channel 里发送数据,那么放弃第一个 case,执行默认的 default。
Golang 的编译器会自动把上面的代码编译成下面的模样
if selectnbsend(ch, 1) {
// ...
} else {
fmt.Println("do default")
}
select 和 channe 的阻塞还有更多复杂的例子,这篇文章中更加详细的讲述了 select 和 channel 的阻塞调用关系。
说完了前言,现在开始正式的分析 chansend 函数,以下是注释,我把其中不重要的部分都省略了。
// block 表示发送操作是否阻塞
// ep 是发送数据的地址
// 返回 true 表示发送成功
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
// 往 nil 的 channel 变量发送数据会永远阻塞,抛出错误
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 这里源码有一大堆注释,我没看明白。。。。
if !block && c.closed == 0 && full(c) {
return false
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// 如果正好有 goroutine 在等待,那么直接把数据拷贝过去
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// 如果缓冲区还有空间,那么把数据暂时放到缓冲区
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 执行到这里说明缓冲区没有空闲了,并且该操作是非阻塞的,立即返回
if !block {
unlock(&c.lock)
return false
}
// 以下代码就是把 goroutine 挂起,
// 等待合适的时候被唤醒,具体细节暂不在本文讨论范围
gp := getg()
gp.waiting = mysg
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
releaseSudog(mysg)
return true
}
接受数据
看完前面发送数据部分以后,看接受数据的代码会觉得很类似
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 这一段源码中有大量的注释,大概的意思是对于非阻塞的情况,并且没有关闭的情况
// 如果是无缓冲chan或者是chan中没有数据,那么直接返回 (false,false)
// 如果要深究其中的 corner case,还要看源码注释
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
lock(&c.lock)
// 如果已经关闭,并且chan中没有数据,返回 (true,false)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果没有发送者在等待,并且缓冲区中有数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
// 从缓冲区复制数据到 ep
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
.....
// someone woke us up
releaseSudog(mysg)
return true, !closed
}
其中注意 sg := c.sendq.dequeue()
这一段代码,逻辑是如果有发送者在队列等待,那么先从缓冲区里面读数据,然后把发送者的数据加到缓冲区里面。 因为在这个时刻,如果有发送者等待,并且有缓冲区,那么缓冲区一定是满的。
关闭 channel
关闭 channel 部分的代码也很简单,在这里就不全部贴出来了,只列举几个有意思的部分
关闭一个已经关闭的 channel 会发生 panic 在源码中得到证实。
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
closechan() 中把 recvq 和 sendq 中所有的 goroutine 都拿出来放到 glist 当中,让调度器重新调度 goroutine 运行。
当挂起的 goroutine 重新运行时,所有的 sender 都会 panic(因为向一个 close 的 chan 发送数据)
结束语
关于 select 和 channel 组合起来还有很多有意思和需要注意的地方,这部分不单单只是 go 的 runtime,还牵涉到 go 编译器如何编译代码,比如这篇文章中提到的 runtime.selectgo
的实现。
这部分内容还有很多,以后再单独的文章中再分析。