事情的起因是我在 k8s 中部署了一个 redis,然后 service A 使用 go-redis
库连接 redis。
这个时候我想到: service Pod 和 redis Pod 启动的顺序是不一定的,可能是 service Pod 先启动,此时 redis pod 还没有启动;又或者 redis pod 中途 restart 了。 go-redis
库能正确的处理重连吗?
简单的用 kubectl 命令删除、 重启了 redis,发现 service Pod 能自动恢复连接,说明 go-redis
正确进行了处理,那么它是怎么做的呢 ?
在寻找答案之前,先来想想如果是我自己实现,需要哪些功能? 该怎么实现?
- conn pool 需要自动删除已经断开的、坏掉的连接。 (开一个 goroutine 定期检查即可)
- 能自动新建连接,补齐一定数量的 conn。 (也不难,goroutine 即可)
- 如何检测一个 conn 是不是出错了?
- 对外的接口是 Get 和 Put,除了正常的用 mutext 控制并发以外,还有什么特殊的操作吗?
go-redis
源码位于 redis/v8/internal/pool/pool.go
, 首先看 pool.Options
数据结构
type Options struct {
PoolSize int // 连接池数量
MinIdleConns int // 最小空闲连接数
MaxConnAge time.Duration // 连接最大生存时间
PoolTimeout time.Duration //
IdleTimeout time.Duration // 空闲超时时间
IdleCheckFrequency time.Duration
}
然后看 pool.ConnPool
结构体
type ConnPool struct {
opt *Options
queue chan struct{} // 工作连接队列
conns []*Conn // 连接队列
idleConns []*Conn // 空闲连接队列
poolSize int
idleConnsLen int // 空闲连接队列长度
closedCh chan struct{}
}
NewConnPool()
NewConnPool()
是连接池的入口函数,代码如下
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
closedCh: make(chan struct{}),
}
p.connsMu.Lock()
p.checkMinIdleConns()
p.connsMu.Unlock()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
return p
}
除了为创建slice 以外就是 checkMinIdleConns()
和开启一个 reaper()
的 goroutine
checkMinIdleConns()
该函数的作用是检查连接池的空闲连接数量是否满足最小空闲连接数量要求,若不满足,则创建足够的空闲连接。
func (p *ConnPool) checkMinIdleConns() {
if p.opt.MinIdleConns == 0 {
return
}
for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
p.poolSize++
p.idleConnsLen++
go func() {
err := p.addIdleConn()
if err != nil {
p.connsMu.Lock()
p.poolSize--
p.idleConnsLen--
p.connsMu.Unlock()
}
}()
}
}
reaper()
reaper()
函数的作用是收割过期的 conn,怎么判断一个 conn 是不是过期呢? 还记得 Options 结构吗,由 IdleTimeout 和 MaxConnAge
判断是否过期。
reaper() 函数的主要逻辑如下:
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
p.getTurn()
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()
p.freeTurn()
if cn != nil {
_ = p.closeConn(cn)
n++
} else {
break
}
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
return n, nil
}
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
p.removeConn(cn)
return cn
}
过程是获取 p.idleConns
数组的第一个元素,即空闲连接数组第一个元素,判断过期,如果它正好也过期了,那么就把它从p.idleConns
和 p.conns
中都删除。
Get() Put()
接下来就是一个 conn pool 最基本的两个操作了:
- 获取一个 conn
- 用完之后归还给 pool
Get() 函数的逻辑很简单: 从 idleConns
获取一个连接,如果它生存时间超时了,那么主动 close,并再次拿一个,如果拿不到,说明没用空闲的了,那么就新建一个。
以上逻辑结合代码,还是很容易理解的。 有趣的是其中的 Turn 函数,一开始没看明白是什么意思。
getTurn, waitTurn, freeTurn
这三个函数实际都是在操作 ConnPool queue
结构体成员,说白了这个 Turn 就是实现了一个令牌桶。 在创建 conn pool 的代码中,我们看到这样的代码:
queue: make(chan struct{}, opt.PoolSize),
所以,queue 是一个缓冲区大小等于 PoolSize 的 channel,每当 Get、Put、Remove 操作时都要 拿令牌或者归还令牌,当 channel 中没有令牌时,Get 操作就会阻塞。
目的是,当有多个并发时,每个调用者排队独占一个 net.Conn
。
另一种 pool 实现
接着上面最后一段的话题,我还见过一种使用 mutex 的实现方式,乍一看更加简洁:
type ConnPool struct {
conns []*grpc.ClientConn
cur int
size int
mux sync.mutex
}
func (pool *ConnPool) get() *grpc.ClientConn {
ret := pool.conns[pool.cur]
pool.mux.Lock()
pool.cur = (pool.cur + 1) % pool.size
pool.mux.Unlock()
return ret
}
这种方式与 go-redis
实现的区别在哪? 我觉得前者每个调用者不是独占 ClientConn 的,而后者的每个调用者独占一个 TCP 连接。
总结
go-redis
实现的 conn pool:
- conn 分为正在使用的和 idle 的,idle的连接有空闲超时,超过一定时间就会被关闭; 同时,连接有最大生存时间,如果一个 idle 连接的 MaxConnAge 超时了也要被关闭。 所以 idle 满足两者中的一个就会被清理。
reaper()
函数负责处理以上两种超时的 conn,ReapStaleConns()
这个名字 stale 就很形象了。checkMinIdleConns()
负责维护正常的 连接数 和 最小空闲连接数,缺少了就创建新的补上。- 没有看到它是如何检测坏掉的 tcp 连接。
到此为止,我们就可以解释本文开头观察到的 go-redis
总是能自动的建立连接的原因:
- 假设 service pod 比 redis pod 先启动,此时 pool 无法与 redis 建立连接,但是
checkMinIdleConns()
保证不断重试,知道连接到 redis 为止。 - 假设 redis pod 中途 crash,调用者发现 TCP 连接返回有错误时,可以主动调用
Remove()
把该连接从 pool 中删除。 - 等待 redis pod 重启后,建立新的连接,满足最小连接数要求。