首先看一个最简单的建立 client server 之间 gRPC 连接的代码,以这个代码为例,分析一下 TCP 是在何时建立的。
Server 端的代码相对来说很容易,一个最简单的 server 代码如下:
func main() {
lis, _ := net.Listen("tcp", fmt.Sprintf(":%d", 8080))
grpcServer := grpc.NewServer()
protobuf.RegisterTestServer(grpcServer, &server{})
grpcServer.Serve(lis)
}
在 grpc/server.go 中的 Serve() 函数调用了 lis.Accept()
并阻塞,当 client 端发来 TCP 请求时,Accept() 返回 Conn 结构,并开启 goroutine handleRawConn()
进行后续的处理。
就 TCP 来说,server 端的代码简单易懂,相比之下 client 端则不一样,一个基本的 Client 代码如下:
func main() {
conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure())
defer conn.Close()
cli := protobuf.NewTestClient(conn)
}
而要弄清楚 Client 端如何建立 TCP 却不容易,这是因为 grpc client 有 resolve DNS 以及做 load balancer 的功能,因此代码复杂很多。
从上面的代码不难看出肯定是在 Dial() 函数中建立的,它的具体实现是在 DialContext()
, 返回 ClientConn
结构体指针,但是却看不到在哪建立了 TCP 链接,这是因为 TCP 链接是在一个 Goroutine 中异步建立的。如果想要 DialContext()
等连接建立完再返回,可以指定grpc.WithBlock()传入Options来实现。
下面就来剖析一下整个过程。
先说结论,client 是在 clientconn.go
文件的 connect()
里面 go ac.resetTransport()
建立了到 server 的 TCP 链接的。
// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle {
ac.mu.Unlock()
return nil
}
// Update connectivity state within the lock to prevent subsequent or
// concurrent calls from resetting the transport more than once.
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
go ac.resetTransport()
return nil
}
接着跟踪 ac.resetTransport()
, 调用顺序如下:tryAllAddrs()
> createTransport()
> NewClientTransport()
> newHTTP2Client()
, 最终,看到 dial(connectCtx, opts.Dialer, addr.Addr)
可以在 dial 函数出设置断点验证我们的猜测,当程序执行到断点后,可以通过 netstat 命令查看 tcp 链接。
# netstat
tcp 15 0 127.0.0.1:40972 127.0.0.1:8080 ESTABLISHED 9903/.
并使用 backtrace 验证调用栈
0 0x0000000000748c02 in google.golang.org/grpc/internal/transport.newHTTP2Client
at ./vendor/google.golang.org/grpc/internal/transport/http2_client.go:174
1 0x00000000007c21c8 in google.golang.org/grpc/internal/transport.NewClientTransport
at ./vendor/google.golang.org/grpc/internal/transport/transport.go:581
2 0x00000000007c21c8 in google.golang.org/grpc.(*addrConn).createTransport
at ./vendor/google.golang.org/grpc/clientconn.go:1300
3 0x00000000007c18ca in google.golang.org/grpc.(*addrConn).tryAllAddrs
at ./vendor/google.golang.org/grpc/clientconn.go:1223
4 0x00000000007c0e9f in google.golang.org/grpc.(*addrConn).resetTransport
at ./vendor/google.golang.org/grpc/clientconn.go:1133
那么是怎么一步一步从 Dial() 调用到位于 clientconn.go 中的 connect()
函数呢?
这要从 client 端的 resolve DNS 以及 load balancer 说起,先看一张 gRPC 客户端负载均衡的架构图
不难看出,client 端挺复杂的,左边是 Resolver 解析器,它主要完成下面这几个功能:
- 服务发现的实现
- 和注册中心(Etcd、CoreDNS、Consul 等)通信,实时获取服务器的列表(或者处理变更信息)
- 将上步获取的数据发送给 Balancer,以便更新 Connection Pool(内置 gRPC 长连接池)
结合上面的图,我们再来看一下 backtrace,结果符合我们的预期,可以看到,代码先执行到 Resolver,然后到 Balancer。
0 0x00000000007bf3d3 in google.golang.org/grpc.(*addrConn).connect
at ./vendor/google.golang.org/grpc/clientconn.go:799
1 0x00000000007ba408 in google.golang.org/grpc.(*acBalancerWrapper).Connect
at ./vendor/google.golang.org/grpc/balancer_conn_wrappers.go:264
2 0x00000000007c59d4 in google.golang.org/grpc.(*pickfirstBalancer).UpdateClientConnState
at ./vendor/google.golang.org/grpc/pickfirst.go:102
3 0x00000000007b93ab in google.golang.org/grpc.(*ccBalancerWrapper).updateClientConnState
at ./vendor/google.golang.org/grpc/balancer_conn_wrappers.go:127
4 0x00000000007bde7c in google.golang.org/grpc.(*ClientConn).updateResolverState
at ./vendor/google.golang.org/grpc/clientconn.go:653
5 0x00000000007c8134 in google.golang.org/grpc.(*ccResolverWrapper).UpdateState
at ./vendor/google.golang.org/grpc/resolver_conn_wrapper.go:177
6 0x000000000075efc2 in google.golang.org/grpc/internal/resolver/passthrough.(*passthroughResolver).start
at ./vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go:48
7 0x000000000075efc2 in google.golang.org/grpc/internal/resolver/passthrough.(*passthroughBuilder).Build
at ./vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go:34
8 0x00000000007c7a31 in google.golang.org/grpc.newCCResolverWrapper
at ./vendor/google.golang.org/grpc/resolver_conn_wrapper.go:103
9 0x00000000007bb506 in google.golang.org/grpc.DialContext
at ./vendor/google.golang.org/grpc/clientconn.go:301
10 0x00000000007de6ad in google.golang.org/grpc.Dial
at ./vendor/google.golang.org/grpc/clientconn.go:106
11 0x00000000007de6ad in main.main
在 bt 7, 我们看到了函数 Build(), 查找它的实现发现是一个接口,
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
按照注释的意思大概得知,向 gRPC 注册(解析器)服务发现时,实际上注册的是 Builder,一般在 Build 中会开启单独的 groutine,进行 List-watcher 逻辑。
Build() 参数中的 ClientConn,提供了 Builder 和 clientconn.go
中的 ClientConn 结构体
交互的纽带,可以调用 cc.UpdateState(resolver.State{Addresses: addrList})
来向 ClientConn 即时发送服务器列表的更新。
注意,这里有两个名叫 ClientConn
的东西,
- 一个是
clientconn.go
中的ClientConn 结构体
,位于 package gprc - 另一个是
resolver.go
中的ClientConn interface
, 位于 package resolver
ClientConn interface
的实现在 resolver_conn_wrapper.go
中的 ccResolverWrapper
。
但是最终,调用了 clientconn.go 中的 ClientConn,这也就是为什么它们叫了一样的名字的原因吧。
所以说白了 ccResolverWrapper 把 internal 的 resolver
多包了一层,承担了与 clientconn.go 的桥梁。
说完这些,那么就不难理解 bt 3 中 ccBalancerWrapper
的作用了。
总结
看完上面这些,总结一下 grpc client 建立 TCP 链接的过程。
grpc client 提供了 Resolve 协议和 Load Balance 的功能,所以即便是一个最简单的 Dial()
函数,也要经历 Resolver 和 Balancer 的代码。
resolver 和 balancer 的核心代码在 internal 目录里,而在 grpc 主目录下,都提供了 Wrapper 函数,resolver_conn_wrapper.go
, balancer_conn_wrapper.go
和 pickfirst.go