在上一篇文章中,介绍了 grpc 建立 TCP 连接的过程,侧重点在 Client 端,而关于 Server 端建立 TCP 的过程相对是比较简单的。

Server端 listen on 本地端口,并且接收来自 client 的连接请求,一旦建立 TCP 连接后,接下来的步骤是什么呢? 建立 HTTP2 server,并收发数据。

本文尝试回答一下几个问题:

  1. Server 怎么利用 http2 的 stream 传输数据?
  2. 从 stream 里读的数据存放在哪?
  3. Stream 读到的数据如何传给用户
  4. Server 要发送的数据又是从哪发送的?

创建 http2Server

首先从用户的代码入手,用户的代码最后会调用 grpcServer.Serve(lis), 稍微追踪几个函数就能发现调用链是 handleRawConn()serveStreams()

handleRawConn() 中我们发现 newHTTP2Transport 会创建一个新的 http2Server

serveStreams() 中的 HandleStreams()type ServerTransport interface 的一个函数,而 type http2Server struct 实现了这个接口。

值得注意的是,有两个结构体实现了 ServerTransport,分别是

  1. transport/handler_server.go 的 serverHandlerTransport
  2. transport/http2_server.go 的 http2Server

一般我们在 main 函数中调用 grpcServer.Serve(lis) 的,最后都是 http2Server。 如果是 ServerHTTP() 则是第一个 serverHandlerTransport,所以这一个很少用,代码不用看。

最后,位于 http2_server.gofunc (t *http2Server) HandleStreams() 函数负责处理 client 发来的每一个 HTTP2 frame,而它的入口参数 handler 是位于 server.gofunc (s *Server) handleStream()

综上所述,http2 在一条 tcp 连接中有多个 stream,而每个 stream 都是在 HandleStreams() 的无限 for 循环中完成的。

在这个循环中,server 不断读取 frame,如果是一个 Data Frame,那么读出数据存到 buffer中,如果读到 Header Frame,说明有新的 stream需要建立, 在 operateHeaders() 中能看到会新建 stream,在函数最后调用 handler(),对应 Server.handleStream()

Stream 读到的数据放哪

先看 stream 的数据结构

type Stream struct {
	id           uint32
	st           ServerTransport    // nil for client side Stream
...
	buf          *recvBuffer
	trReader     io.Reader
}

type recvBuffer struct {
	c       chan recvMsg
	mu      sync.Mutex
	backlog []recvMsg
	err     error
}

HandleStreams() 的 for 循环中读到的 Data Frame 数据最后都放入了 buf *recvBuffer 中。

        if len(f.Data()) > 0 {
			buffer := t.bufferPool.get()
			buffer.Reset()
			buffer.Write(f.Data())
			s.write(recvMsg{buffer: buffer})
		}

顺便看到上面的结构中还有一个 trReader, 它是 transportReader 类型的变量,最终使用它的是 Stream 的 Read 函数。

func (s *Stream) Read(p []byte) (n int, err error) {
	// Don't request a read if there was an error earlier
	if er := s.trReader.(*transportReader).er; er != nil {
		return 0, er
	}
	s.requestRead(len(p))
	return io.ReadFull(s.trReader, p)
}

也就是说,读取 Stream 传来的数据都是从这里读到的,但是我们前面知道数据是存在 recvBuffer 的,可是这个结构体中只有一个 io.Reader 啊,怎么读到 recvBuffer 的东西呢?

operateHeaders() 中,(前面提到,for 循环中不断处理 http2 的 frame,遇到新的 header frame 调用 operateHeaders)在创建新的 Stream 时, 把 recvBufferReader 赋值给了 io.Reader 类型的 reader。

	s.trReader = &transportReader{
		reader: &recvBufferReader{
			ctx:        s.ctx,
			ctxDone:    s.ctxDone,
			recv:       s.buf,
			freeBuffer: t.bufferPool.put,
		},
    }

Stream 读到的数据如何传给用户

到这里,我们知道了从 client 发来的数据在哪里,那么用户程序是怎么读到的呢? 先从一段 backtrace 说起

(dlv) bt
 0  0x0000000001403f73 in google.golang.org/grpc/internal/transport.(*Stream).Read
    at ./vendor/google.golang.org/grpc/internal/transport/transport.go:457
 1  0x000000000146fb43 in google.golang.org/grpc.(*parser).recvMsg
    at ./vendor/google.golang.org/grpc/rpc_util.go:508
 2  0x000000000147085d in google.golang.org/grpc.recvAndDecompress
    at ./vendor/google.golang.org/grpc/rpc_util.go:639
 3  0x00000000014711bb in google.golang.org/grpc.recv
    at ./vendor/google.golang.org/grpc/rpc_util.go:707
 4  0x0000000001484786 in google.golang.org/grpc.(*serverStream).RecvMsg
    at ./vendor/google.golang.org/grpc/stream.go:1474
 5  0x000000000149cf22 in learn-grpc/proto.(*testSumServer).Recv
    at ./proto/00.pb.go:655
 6  0x000000000149e94d in main.(*server).Sum
    at ./server3/main.go:34
 7  0x000000000149ce0d in learn-grpc/proto._Test_Sum_Handler
    at ./proto/00.pb.go:636
 8  0x00000000014784c3 in google.golang.org/grpc.(*Server).processStreamingRPC
    at ./vendor/google.golang.org/grpc/server.go:1237
 9  0x0000000001479cf8 in google.golang.org/grpc.(*Server).handleStream
    at ./vendor/google.golang.org/grpc/server.go:1317
10  0x0000000001486901 in google.golang.org/grpc.(*Server).serveStreams.func1.1
    at ./vendor/google.golang.org/grpc/server.go:722
11  0x00000000010630c1 in runtime.goexit
    at /usr/local/go/src/runtime/asm_amd64.s:1373

注意其中的 5,6,7 backtrace,这是我们的用户代码,通常我们是在 for 循环中不断的调用 Recv() 直到 EOF.

for {
		reqObj, err = req.Recv()
		if err == io.EOF {
			return 
		}
        ....// process data
	}

由此可见是 processStreamingRPC() 调用对应 protobuf 中指明的 RPC call 函数,并且通过 Recv() 不断地读 Stream buf 中的内容,从而达到 用户代码能不断地读取 Stream 中 client 发来的数据。

重点就是下面的这个单独的 goroutine s.handleStream(), 它被作为 handler 参数传进了 st.HandleStreams(), 对应的实现是 http2Server.HandleStreams()。

func (s *Server) serveStreams(st transport.ServerTransport) {
	defer st.Close()
	var wg sync.WaitGroup
	st.HandleStreams(func(stream *transport.Stream) {
		wg.Add(1)
		go func() {
			defer wg.Done()
			s.handleStream(st, stream, s.traceInfo(st, stream))
		}()
	},)
	wg.Wait()
}

前面已经提到,http2Server.HandleStreams() 就是在不断的处理 frame,读到 Header Frame 就会建新的 Stream,而这个 s.handleStream() 的 handler 就是在这个 operateHeaders 函数的最后被调用的。

Server 数据从哪发送

Server 读取 client 的数据部分已经很清楚了,那么 Server 的数据是从哪发送的呢 ?

发送数据的过程与之前的读数据稍微有一些不同,主要区别是它是两个独立的过程,用户程序写入数据到 controlBuffer 之后就不管了, 另外有一个goroutine LoopyWriter 不断地从 controlBuffer 中读取数据并写入到 net.Conn 中。

Server 发送数据给 client 的 backtrace 如下,

0  0x00000000013fcbc3 in google.golang.org/grpc/internal/transport.(*http2Server).Write
   at ./vendor/google.golang.org/grpc/internal/transport/http2_server.go:892
1  0x00000000014840ac in google.golang.org/grpc.(*serverStream).SendMsg
   at ./vendor/google.golang.org/grpc/stream.go:1421
2  0x000000000149ce99 in learn-grpc/proto.(*testSumServer).SendAndClose
   at ./proto/00.pb.go:650
3  0x000000000149ea5a in main.(*server).Sum
   at ./server3/main.go:37
4  0x000000000149ce0d in learn-grpc/proto._Test_Sum_Handler
   at ./proto/00.pb.go:636
5  0x00000000014784c3 in google.golang.org/grpc.(*Server).processStreamingRPC
   at ./vendor/google.golang.org/grpc/server.go:1237
6  0x0000000001479cf8 in google.golang.org/grpc.(*Server).handleStream
   at ./vendor/google.golang.org/grpc/server.go:1317
7  0x0000000001486901 in google.golang.org/grpc.(*Server).serveStreams.func1.1
   at ./vendor/google.golang.org/grpc/server.go:722

看到这里也许你会问,问什么也是从 processStreamingRPC() 开始,最后调用SendAndClose() 的呢?

这是因为在我的用户程序里 recv() 和 send() 都是在 Sum() 函数中的。

http2Server.Write() 在最后把数据写到 t.controlBuf.put(df) 就算结束了。

在另一端,一开始创建的 newHTTP2Server() 中会启用单独的 goroutine newLoopyWriter() 从 controlBuf 中get 数据,并通过 net.Conn 写入。

go func() {
		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
		if err := t.loopy.run(); err != nil {
			errorf("transport: loopyWriter.run returning. Err: %v", err)
		}
		t.conn.Close()
		close(t.writerDone)
	}()

参考资料