本文主要关注以下几个问题:

  1. Golang runtime 是怎么调用 epoll 的系统调用的 ?
  2. Golang net 库如何封装 epoll,使得开发者几乎不用直接操作 epoll ?

C 如何调用 epoll

首先回顾一下用 C 语言怎么使用 epoll

int s = socket(AF_INET, SOCK_STREAM, 0);   
bind(s...)
listen(s...)

int epfd = epoll_create(128); //创建eventpoll对象

ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
epoll_ctl(epfd, EPOLL_CTL_ADD, s, &ev);//注册事件

//轮询就绪事件
while(true){
    //返回值n为就绪的事件数,events为事件列表
    int n = epoll_wait(epfd, &events[0], len(events), 1000)
    for( i := 0; i < n; i++ ) {
        ev := &events[i]
        //处理事件
    }
}

C 语言中调用 epoll 的方式比较底层,总的来说分下面三个步骤

  1. epoll_create() 创建epoll对象
  2. epoll_ctl() 注册套接字的事件
  3. epoll_wait() 轮询是否有事件发生,并通过events参数返回就绪(触发)的事件列表

Go 如何调用 epoll

那么在 Golang 的 runtime 最终也是调用了这么底层的 glibc 库函数吗? Golang 是如何封装 epoll 的呢?

搜索之后在 runtime/sys_linux_amd64.s 文件下面找到了 epollctl()

// func epollctl(epfd, op, fd int32, ev *epollEvent) int
TEXT runtime·epollctl(SB),NOSPLIT,$0
    MOVL    epfd+0(FP), DI
    MOVL    op+4(FP), SI
    MOVL    fd+8(FP), DX
    MOVQ    ev+16(FP), R10
    MOVL    $SYS_epoll_ctl, AX
    SYSCALL
    MOVL    AX, ret+24(FP)
    RET

可见,Golang runtime 最后是直接用汇编调用 系统调用 epollctl,类似的,在同一个文件下也能找到 epoll create 和 wait 的汇编代码。

知道了这些之后,我们再来看看 Golang 的网络库 net 是怎么封装 epoll 的。

Go 如何封装 epoll

Golang net 库封装 epoll 核心文件在系统文件 src/runtime/netpoll.go, 这个文件对不同平台的 API 进行封装,Linux 平台相关的部分在 src/runtime/netpoll_epoll.go

自顶向下,先看看开发者如何调用 Go 封装好的接口的

func main() {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        panic(err)
    }
    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }
        go handleConnection(conn)
    }
}

func handleConnection(conn net.Conn) {
    var body [4]byte
    addr := conn.RemoteAddr()
    for {
        conn.Read(body[:])
        conn.Write(body[:])
    }
}

以上是一个最简单的 Golang TCP server 的例子,由此可见,开发者仅仅需要调用 Listen(), Accept(), Read(), Write() 等函数即可,完全不用管理 epoll

Listen 做了什么

从 Listen 函数开始,整体的调用链如下

  1. lc.Listen()
  2. sl.listenTCP()
  3. internetSocket()
  4. socket() 调用 sysSocket() 返回一个 int 的文件描述符,newFD() 创建一个 netFD 结构,并且进入 listenStream()
  5. listenStream() 中调用 fd.init()
func (fd *netFD) init() error {
    // pfd mean poll FD
    return fd.pfd.Init(fd.net, true)
}
  1. fd.pfd.Init()
func (fd *FD) Init(net string, pollable bool) error {
    // pd means pollDesc
    err := fd.pd.init(fd)
    
    return err
}
  1. fd.pd.init() 中的 pd 是 pollDesc 的缩写,在 poll/fd_unix.go 文件中,有关于 FD 结构体的定义
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex
    // System file descriptor. Immutable until Close.
    Sysfd int
    // I/O poller.
    pd pollDesc
}

可见,每一个文件描述符都对应了一个 IO poller,Golang runtime 就是把这些 poller 连成链表进行管理的,具体的在下面小节再介绍。 我们先看看 pollDesc 的 init 函数做了些什么

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit) // init epoll 只需要做一次,每个进程只做一次
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

runtime_pollServerInit() 对应 Golang Runtime 中的 poll_runtime_pollServerInit() 函数。

func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

查看 netpollinit() 的函数,不难发现就是在这里调用了 epollcreate1() 和 epollctl()

同样的,runtime_pollOpen函数对应 Golang 运行时的 poll_runtime_pollOpen() 函数。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()

    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

最后在 netpollopen() 函数中,把这个 fd 注册 epoll 的可读,可写,挂断,边沿触发等事件。

到此,从 socket() 系统调用返回的 fd,到把这个 fd 注册 epoll 事件的过程已经完成,这么长的过程, 其实只完成了开头 C 语言前 8 行的工作,对应 Go 的 server 代码,也仅仅是完成了 Listen 函数

Accept 做了什么

自顶向下,Accept 的主要函数调用链如下

  1. TCPListener accept()
  2. netFD 的 accept(),在这里创建一个新的 netFD 并初始化
  3. FD 的 Accept() 会重置pollDesc中的rg, 并调用原始套接字的accept接口返回一个 fd。等待连接到来,如果返回 EGAIN,则当前g 被gorpark
func (fd *netFD) accept() (netfd *netFD, err error) {
    // 获取文件描述符  d 
    d, rsa, errcall, err := fd.pfd.Accept()
    // 创建新 netFD
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    // 与 Listen 一样的初始化,把 fd 注册到 epoll
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    
    return netfd, nil
}

上面代码的 fd.pfd.Accept() 具体过程如下

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()

    // 重置pollDesc中的 rg
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
    for {
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EINTR:
            continue
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

Read/Write 做了什么

调用关系是 conn.Read -> netFD.Read -> FD.Read,其中重点是 FD.Read

func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    if len(p) == 0 {
        return 0, nil
    }
    // 重置 pollDesc.rg 为 0
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        n, err := ignoringEINTR(func() (int, error) { return syscall.Read(fd.Sysfd, p) })
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

从上面的 fd.pd.waitRead() 继续跟踪,发现最后是调用了 poll_runtime_pollWait(),最后在 netpollblock() 中调用 gopark 把当前 goroutine 给 park 住。

由此可以看出,Read 和 Write 函数并不是靠 epoll_wait() 进行阻塞和等待的,而是被 Go Runtime park 住了,所以让开发者感觉像是 Read() 被阻塞了。

实际上,Golang 中所有文件描述符都被设置成非阻塞的,某个 goroutine 进行网络io操作,读或者写文件描述符,如果此刻网络io还没准备好, 则这个 goroutine 会被放到系统的等待队列中,这个 goroutine 失去了运行权,但并不是真正的整个系统“阻塞”于系统调用,后台还有一个poller会不停地进行poll, 所有的文件描述符都被添加到了这个 poller 中的,当某个时刻一个文件描述符准备好了,poller 就会唤醒之前因它而阻塞的 goroutine,于是 goroutine 重新运行起来。

netpoll 轮询 fd

Go runtime 中专门的轮询器实现是在 netpoll_epoll.go 中的 netpoll() 函数,这里才是真正调用 epollwait() 的地方。

除了 netpoll 函数以外,还有其他几个辅助函数:

  • runtime.netpollinit(), 初始化网络轮询器,通过 sync.Once 和 netpollInited 变量保证函数只会调用一次
  • runtime.netpollopen(), 监听文件描述符上的边缘触发事件,创建事件并加入监听;
  • runtime.netpoll(), 轮询网络并返回一组已经准备就绪的 Goroutine
  • runtime.netpollBreak(), 唤醒网络轮询器

而调用 netpoll() 函数的则是 Go runtime 中的 findrunnable()sysmon()

pollDesc

上面提到,Golang 中所有的文件描述符都是非阻塞的,具体到网络 socket 的文件描述符, Golang 的网络操作都以 netFD 为中心实现。

netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时, 就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件, 才将此 goroutine 重新运行。

Golang Runtime 中有一个 runtime.pollCache 的全局变量,通过每个FD 中的 pollDesc, 把所有 FD 连成了一个链表,Go 语言网络轮询器会监听 runtime.pollDesc 结构体的状态。

总结

以上就是 Go 对 epoll 的封装,使得程序员在调用 Listen(), Accept() 的时候,完全不用对 epoll 的细节进行管理,提高了开发者的效率。

其中 gopark 和 pollDesc 的部分牵涉到 Go runtime 的设计与实现,在本文中没有详细展开。

最后用两张网上的截图回顾一下 server 端和 client 端函数调用链:

TCP Server

TCP Client

参考资料