在上一篇文章中,介绍了 grpc 建立 TCP 连接的过程,侧重点在 Client 端,而关于 Server 端建立 TCP 的过程相对是比较简单的。
Server端 listen on 本地端口,并且接收来自 client 的连接请求,一旦建立 TCP 连接后,接下来的步骤是什么呢? 建立 HTTP2 server,并收发数据。
本文尝试回答一下几个问题:
- Server 怎么利用 http2 的 stream 传输数据?
- 从 stream 里读的数据存放在哪?
- Stream 读到的数据如何传给用户
- Server 要发送的数据又是从哪发送的?
创建 http2Server
首先从用户的代码入手,用户的代码最后会调用 grpcServer.Serve(lis)
, 稍微追踪几个函数就能发现调用链是 handleRawConn()
到 serveStreams()
。
从 handleRawConn()
中我们发现 newHTTP2Transport
会创建一个新的 http2Server
。
serveStreams()
中的 HandleStreams()
是 type ServerTransport interface
的一个函数,而 type http2Server struct
实现了这个接口。
值得注意的是,有两个结构体实现了 ServerTransport,分别是
- transport/handler_server.go 的 serverHandlerTransport
- transport/http2_server.go 的 http2Server
一般我们在 main 函数中调用 grpcServer.Serve(lis)
的,最后都是 http2Server。 如果是 ServerHTTP()
则是第一个 serverHandlerTransport,所以这一个很少用,代码不用看。
最后,位于 http2_server.go
的 func (t *http2Server) HandleStreams()
函数负责处理 client 发来的每一个 HTTP2 frame,而它的入口参数 handler
是位于 server.go
的 func (s *Server) handleStream()
。
综上所述,http2 在一条 tcp 连接中有多个 stream,而每个 stream 都是在 HandleStreams()
的无限 for 循环中完成的。
在这个循环中,server 不断读取 frame,如果是一个 Data Frame,那么读出数据存到 buffer中,如果读到 Header Frame,说明有新的 stream需要建立,
在 operateHeaders()
中能看到会新建 stream,在函数最后调用 handler(),对应 Server.handleStream()
Stream 读到的数据放哪
先看 stream 的数据结构
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
...
buf *recvBuffer
trReader io.Reader
}
type recvBuffer struct {
c chan recvMsg
mu sync.Mutex
backlog []recvMsg
err error
}
从 HandleStreams()
的 for 循环中读到的 Data Frame 数据最后都放入了 buf *recvBuffer
中。
if len(f.Data()) > 0 {
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
}
顺便看到上面的结构中还有一个 trReader
, 它是 transportReader
类型的变量,最终使用它的是 Stream 的 Read 函数。
func (s *Stream) Read(p []byte) (n int, err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.(*transportReader).er; er != nil {
return 0, er
}
s.requestRead(len(p))
return io.ReadFull(s.trReader, p)
}
也就是说,读取 Stream 传来的数据都是从这里读到的,但是我们前面知道数据是存在 recvBuffer 的,可是这个结构体中只有一个 io.Reader
啊,怎么读到 recvBuffer
的东西呢?
在 operateHeaders()
中,(前面提到,for 循环中不断处理 http2 的 frame,遇到新的 header frame 调用 operateHeaders)在创建新的 Stream 时,
把 recvBufferReader
赋值给了 io.Reader 类型的 reader。
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
},
}
Stream 读到的数据如何传给用户
到这里,我们知道了从 client 发来的数据在哪里,那么用户程序是怎么读到的呢? 先从一段 backtrace 说起
(dlv) bt
0 0x0000000001403f73 in google.golang.org/grpc/internal/transport.(*Stream).Read
at ./vendor/google.golang.org/grpc/internal/transport/transport.go:457
1 0x000000000146fb43 in google.golang.org/grpc.(*parser).recvMsg
at ./vendor/google.golang.org/grpc/rpc_util.go:508
2 0x000000000147085d in google.golang.org/grpc.recvAndDecompress
at ./vendor/google.golang.org/grpc/rpc_util.go:639
3 0x00000000014711bb in google.golang.org/grpc.recv
at ./vendor/google.golang.org/grpc/rpc_util.go:707
4 0x0000000001484786 in google.golang.org/grpc.(*serverStream).RecvMsg
at ./vendor/google.golang.org/grpc/stream.go:1474
5 0x000000000149cf22 in learn-grpc/proto.(*testSumServer).Recv
at ./proto/00.pb.go:655
6 0x000000000149e94d in main.(*server).Sum
at ./server3/main.go:34
7 0x000000000149ce0d in learn-grpc/proto._Test_Sum_Handler
at ./proto/00.pb.go:636
8 0x00000000014784c3 in google.golang.org/grpc.(*Server).processStreamingRPC
at ./vendor/google.golang.org/grpc/server.go:1237
9 0x0000000001479cf8 in google.golang.org/grpc.(*Server).handleStream
at ./vendor/google.golang.org/grpc/server.go:1317
10 0x0000000001486901 in google.golang.org/grpc.(*Server).serveStreams.func1.1
at ./vendor/google.golang.org/grpc/server.go:722
11 0x00000000010630c1 in runtime.goexit
at /usr/local/go/src/runtime/asm_amd64.s:1373
注意其中的 5,6,7 backtrace,这是我们的用户代码,通常我们是在 for 循环中不断的调用 Recv() 直到 EOF.
for {
reqObj, err = req.Recv()
if err == io.EOF {
return
}
....// process data
}
由此可见是 processStreamingRPC() 调用对应 protobuf 中指明的 RPC call 函数,并且通过 Recv() 不断地读 Stream buf 中的内容,从而达到 用户代码能不断地读取 Stream 中 client 发来的数据。
重点就是下面的这个单独的 goroutine s.handleStream()
, 它被作为 handler 参数传进了 st.HandleStreams(), 对应的实现是 http2Server.HandleStreams()。
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
},)
wg.Wait()
}
前面已经提到,http2Server.HandleStreams()
就是在不断的处理 frame,读到 Header Frame 就会建新的 Stream,而这个 s.handleStream()
的 handler 就是在这个 operateHeaders 函数的最后被调用的。
Server 数据从哪发送
Server 读取 client 的数据部分已经很清楚了,那么 Server 的数据是从哪发送的呢 ?
发送数据的过程与之前的读数据稍微有一些不同,主要区别是它是两个独立的过程,用户程序写入数据到 controlBuffer
之后就不管了,
另外有一个goroutine LoopyWriter
不断地从 controlBuffer
中读取数据并写入到 net.Conn 中。
Server 发送数据给 client 的 backtrace 如下,
0 0x00000000013fcbc3 in google.golang.org/grpc/internal/transport.(*http2Server).Write
at ./vendor/google.golang.org/grpc/internal/transport/http2_server.go:892
1 0x00000000014840ac in google.golang.org/grpc.(*serverStream).SendMsg
at ./vendor/google.golang.org/grpc/stream.go:1421
2 0x000000000149ce99 in learn-grpc/proto.(*testSumServer).SendAndClose
at ./proto/00.pb.go:650
3 0x000000000149ea5a in main.(*server).Sum
at ./server3/main.go:37
4 0x000000000149ce0d in learn-grpc/proto._Test_Sum_Handler
at ./proto/00.pb.go:636
5 0x00000000014784c3 in google.golang.org/grpc.(*Server).processStreamingRPC
at ./vendor/google.golang.org/grpc/server.go:1237
6 0x0000000001479cf8 in google.golang.org/grpc.(*Server).handleStream
at ./vendor/google.golang.org/grpc/server.go:1317
7 0x0000000001486901 in google.golang.org/grpc.(*Server).serveStreams.func1.1
at ./vendor/google.golang.org/grpc/server.go:722
看到这里也许你会问,问什么也是从 processStreamingRPC()
开始,最后调用SendAndClose() 的呢?
这是因为在我的用户程序里 recv() 和 send() 都是在 Sum() 函数中的。
http2Server.Write()
在最后把数据写到 t.controlBuf.put(df)
就算结束了。
在另一端,一开始创建的 newHTTP2Server() 中会启用单独的 goroutine newLoopyWriter()
从 controlBuf 中get 数据,并通过 net.Conn 写入。
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
if err := t.loopy.run(); err != nil {
errorf("transport: loopyWriter.run returning. Err: %v", err)
}
t.conn.Close()
close(t.writerDone)
}()