Channel 可以说是 Go 语言最具特色的设计了,我们经常会看到一些老鸟这样教育菜鸟:

Do not communicate by sharing memory; instead, share memory by communicating.

那么熟练使用 Golang 就离不开 channel,有必要了解一下 channel 是怎么实现的。

channel 的源代码在 Golang 的 src/runtime/chan.go 目录下,先看结构体:

type hchan struct {
    qcount   uint           // 循环列表元素个数
    dataqsiz uint           // 循环队列的大小
    buf      unsafe.Pointer // 循环队列的指针
    elemsize uint16         // channel 中元素的大小
    closed   uint32         // 是否已close
    elemtype *_type         // channel 中元素类型
    sendx    uint           // send 在buffer中的索引
    recvx    uint           // recv 在buffer中的索引
    recvq    waitq          // receiver 的等待队列
    sendq    waitq          // sender 的等待队列 

    lock mutex
}

type waitq struct {
    first *sudog
    last  *sudog
}

其中

  • qcount 代表 channel 已经接收但还没被取走的元素的个数
  • recvq 和 sendq 表示等待接收数据 和等待发送数据的 goroutine 队列 (用 sudog 结构表示)

举例来说,假设用make(chan int, 100) 创建了带100个缓冲区的 channel,qcount 代表缓冲区中元素个数,dataqsiz = 100, recvq 代表等待读数据的 goroutine,而当缓冲区满了以后,sendq 代表阻塞的 goroutine

创建 channel

先看一个最简单的使用 channel 的例子:

func main() {
    c := make(chan int, 123)

    go func() {
        c <- 1 // send to channel
    }()

    x := <-c // recv from channel

    fmt.Println(x)
}

第一行创建channel 的语句实际执行的是 runtime.makechan(),运行 dlv 并加断点,可以看到传入的 size 的确是 123。

=>  71: func makechan(t *chantype, size int) *hchan {
    72:         elem := t.elem
    73:
(dlv) p size
123

makechan 函数的整体思想比较简单,主要是根据创建 chan 的类型和大小选择如何分配内存,

    var c *hchan
    switch {
    case mem == 0:
        // chan的size或者元素的size是0,不必创建buf
        c = (*hchan)(mallocgc(hchanSize, nil, true))

    case elem.ptrdata == 0:
        // 元素不是指针,分配一块连续的内存给hchan数据结构和buf
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))

        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 包含指针,那么由运行时负责单独分配
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

发送数据

向 channel 中发送数据最终调用的是 chansend,而它的调用者是 runtime.chansend1()

那么怎么知道是 runtime 中的这个函数的呢? 一种方法是用 go tool compile -N -l -S test.go 把源代码编译成汇编代码形式,这样就能找到 runtime 开头的函数了

非阻塞 channel

chansend() 函数的整体逻辑还是很好理解的,但是其中参数 block bool 怎么理解呢?

我花了很多时间才明白:这里的 block 并不是指缓冲区满了以后 goroutine 被 block,而是根据用户把 select 和 channel 组合起来使用时,在 select 中表现出来的 block,换句话说,go 编译器决定了这个 block 参数时 true 还是 false

本文开头的例子里是 channel 的阻塞调用,那么非阻塞是什么样子的呢? 答案是: 配合 select default 的时候

ch := make(chan int)
select {
case ch <- 1:
    // ...
default:
    fmt.Println("do default")
}

比如上面这个语句,当执行到它的时候,如果此时无法向 channel 里发送数据,那么放弃第一个 case,执行默认的 default。

Golang 的编译器会自动把上面的代码编译成下面的模样

if selectnbsend(ch, 1) {
    // ...
} else {
    fmt.Println("do default")
}

select 和 channe 的阻塞还有更多复杂的例子,这篇文章中更加详细的讲述了 select 和 channel 的阻塞调用关系。

说完了前言,现在开始正式的分析 chansend 函数,以下是注释,我把其中不重要的部分都省略了。

// block 表示发送操作是否阻塞
// ep 是发送数据的地址
// 返回 true 表示发送成功

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        // 往 nil 的 channel 变量发送数据会永远阻塞,抛出错误
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    // 这里源码有一大堆注释,我没看明白。。。。
    if !block && c.closed == 0 && full(c) {
        return false
    }

    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    if sg := c.recvq.dequeue(); sg != nil {
        // 如果正好有 goroutine 在等待,那么直接把数据拷贝过去
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    if c.qcount < c.dataqsiz {
        // 如果缓冲区还有空间,那么把数据暂时放到缓冲区
        qp := chanbuf(c, c.sendx)
    
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // 执行到这里说明缓冲区没有空闲了,并且该操作是非阻塞的,立即返回
    if !block {
        unlock(&c.lock)
        return false
    }

    // 以下代码就是把 goroutine 挂起,
    // 等待合适的时候被唤醒,具体细节暂不在本文讨论范围
    gp := getg()

    gp.waiting = mysg

    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    releaseSudog(mysg)
    return true
}

接受数据

看完前面发送数据部分以后,看接受数据的代码会觉得很类似

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 这一段源码中有大量的注释,大概的意思是对于非阻塞的情况,并且没有关闭的情况
    // 如果是无缓冲chan或者是chan中没有数据,那么直接返回 (false,false)
    // 如果要深究其中的 corner case,还要看源码注释
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    lock(&c.lock)

    // 如果已经关闭,并且chan中没有数据,返回 (true,false)
    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 如果没有发送者在等待,并且缓冲区中有数据
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            // 从缓冲区复制数据到 ep
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    ..... 
    // someone woke us up
    
    releaseSudog(mysg)
    return true, !closed
}

其中注意 sg := c.sendq.dequeue() 这一段代码,逻辑是如果有发送者在队列等待,那么先从缓冲区里面读数据,然后把发送者的数据加到缓冲区里面。 因为在这个时刻,如果有发送者等待,并且有缓冲区,那么缓冲区一定是满的

关闭 channel

关闭 channel 部分的代码也很简单,在这里就不全部贴出来了,只列举几个有意思的部分

关闭一个已经关闭的 channel 会发生 panic 在源码中得到证实。

if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

closechan() 中把 recvq 和 sendq 中所有的 goroutine 都拿出来放到 glist 当中,让调度器重新调度 goroutine 运行。

当挂起的 goroutine 重新运行时,所有的 sender 都会 panic(因为向一个 close 的 chan 发送数据)

结束语

关于 select 和 channel 组合起来还有很多有意思和需要注意的地方,这部分不单单只是 go 的 runtime,还牵涉到 go 编译器如何编译代码,比如这篇文章中提到的 runtime.selectgo 的实现。

这部分内容还有很多,以后再单独的文章中再分析。

参考资料