Golang Channel 用法总结

之前的博客中已经粗略探究了一下 golang channel 的实现原理,本文总结一下使用 channel 的各种姿势。 先看一下对不同状态的 channel 的读,写,关闭操作的结果 1. 使用 for range 读取 channel 场景: 当需要不断从 channel 里读数据时 这是最常用的方式,又安全又便利,当channel 被关闭时,for 循环自动退出。 用法不再赘述。 2. 使用 _, ok 判断 channel 是否关闭 场景: 读 channel,但需要判断 channel 是否已关闭。 读 channel 的操作 <- chan 既可以返回一个值,也可以返回两个值,这里就是用的两个返回值的方式。 举例: if v, ok := <- ch; ok { // can read channel fmt.Println(v) } 读到数据,并且通道没有关闭时,ok 的值为 true。 通道关闭,无数据读到时,ok 的值为 false。 3. 与 select 搭配使用 场景: 需要对多个通道进行处理,或者设置超时 举例: func (h *Handler) handle(job *Job) { select { case h.jobCh<-job: return case <-h.stopCh: return case <- time.After(2): } } 这里有一个坑需要注意: 当 channel 为 nil,也就是没有初始化时,无论对应的 select case 是读还是写,都会立即返回。 而一般情况下,对 nil channel 的 写操作是要panic的。 ...

2021-05-14 · Me

如何设计一个连接池

事情的起因是我在 k8s 中部署了一个 redis,然后 service A 使用 go-redis 库连接 redis。 这个时候我想到: service Pod 和 redis Pod 启动的顺序是不一定的,可能是 service Pod 先启动,此时 redis pod 还没有启动;又或者 redis pod 中途 restart 了。 go-redis 库能正确的处理重连吗? 简单的用 kubectl 命令删除、 重启了 redis,发现 service Pod 能自动恢复连接,说明 go-redis 正确进行了处理,那么它是怎么做的呢 ? 在寻找答案之前,先来想想如果是我自己实现,需要哪些功能? 该怎么实现? conn pool 需要自动删除已经断开的、坏掉的连接。 (开一个 goroutine 定期检查即可) 能自动新建连接,补齐一定数量的 conn。 (也不难,goroutine 即可) 如何检测一个 conn 是不是出错了? 对外的接口是 Get 和 Put,除了正常的用 mutext 控制并发以外,还有什么特殊的操作吗? go-redis 源码位于 redis/v8/internal/pool/pool.go , 首先看 pool.Options 数据结构 type Options struct { PoolSize int // 连接池数量 MinIdleConns int // 最小空闲连接数 MaxConnAge time.Duration // 连接最大生存时间 PoolTimeout time.Duration // IdleTimeout time.Duration // 空闲超时时间 IdleCheckFrequency time.Duration } 然后看 pool.ConnPool 结构体 ...

2021-05-01 · Me

Goroutine 的 PMG 模型

稍微了解过 Go runtime 的人想必都听过 goroutine 的 PMG 模型,哪么它到底代表什么意思呢? Golang 源码中又是如何实现的? 前言 关于 PMG 的解释网上有很对,随便 copy 一个: M 代表 Machine,系统线程,它由操作系统管理的,goroutine就是跑在M之上的;M 是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。 P 是 Processor,处理器,它的主要用途就是用来执行goroutine的,它维护了一个goroutine队列,即runqueue。Processor是让我们从N:1调度到M:N调度的重要部分。 G 代表 goroutine 它包含了栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel。 通常 go 程序中可以用 GOMAXPROCS 设置 Processor 的个数; 而 M 则是 clone系统调用创建的,或者用linux pthread 库创建出来的线程实体。 M 与 P 是一对一的关系。 基本结构体 打开 src/runtime/runtime2.go 文件,p,m,g 三个结构体的定义是按顺序在一起的,除此之外还有一个 schedt,与 goroutine 的调度相关。 g 结构体 G 就是 goroutine 的意思,每个 Goroutine 对应一个 g 结构体,它有自己的栈内存, G 存储 Goroutine 的运行堆栈、状态以及任务函数。 当一个 goroutine 退出时,g 会被放到一个空闲的对象池中以用于后续的 goroutine 的使用, 以减少内存分配开销。 ...

2021-04-19 · Me

Golang Channel 的实现

Channel 可以说是 Go 语言最具特色的设计了,我们经常会看到一些老鸟这样教育菜鸟: Do not communicate by sharing memory; instead, share memory by communicating. 那么熟练使用 Golang 就离不开 channel,有必要了解一下 channel 是怎么实现的。 channel 的源代码在 Golang 的 src/runtime/chan.go 目录下,先看结构体: type hchan struct { qcount uint // 循环列表元素个数 dataqsiz uint // 循环队列的大小 buf unsafe.Pointer // 循环队列的指针 elemsize uint16 // channel 中元素的大小 closed uint32 // 是否已close elemtype *_type // channel 中元素类型 sendx uint // send 在buffer中的索引 recvx uint // recv 在buffer中的索引 recvq waitq // receiver 的等待队列 sendq waitq // sender 的等待队列 lock mutex } type waitq struct { first *sudog last *sudog } 其中 qcount 代表 channel 已经接收但还没被取走的元素的个数 recvq 和 sendq 表示等待接收数据 和等待发送数据的 goroutine 队列 (用 sudog 结构表示) 举例来说,假设用make(chan int, 100) 创建了带100个缓冲区的 channel,qcount 代表缓冲区中元素个数,dataqsiz = 100, recvq 代表等待读数据的 goroutine,而当缓冲区满了以后,sendq 代表阻塞的 goroutine ...

2021-04-10 · Me

Golang WaitGroup 的实现

sync.WaitGroup 的作用就是让主函数等待所有 goroutine 都执行完毕,再退出。 一个最简单的例子如下,如果没有 wg,那么 main 会在 goroutine 执行之前就退出,从而不会看到任何 output。 func main() { wg := sync.WaitGroup{} for i := 0; i < 3; i++ { wg.Add(1) go func(i int) { fmt.Println(i) wg.Done() }(i) } wg.Wait() } 那么 WaitGroup 是如何实现的呢? 万变不离其宗,其底层还是基于 go runtime 提供的信号量机制,也就是 runtime_Semrelease() 和 runtime_Semacquire(), 在之前的文章 Golang RWMutex 的实现 和 netpoll 的实现 中都有它们的影子存在。 runtime_Semacquire(s *uint32) 此函数会阻塞直到信号量*s的值大于0,原子减这个值。 runtime_Semrelease(s *uint32, lifo bool, skipframes int) 此函数执行原子增信号量的值,然后通知被runtime_Semacquire阻塞的协程 说到底,就是用 信号量 和 gopark 来控制 goroutine 是运行还是挂起,wg.Add() 对应信号量的增减,wg.Wait() 对应线程/协程的挂起。 WaitGroup 的源码位于 src/sync/waitgroup.go 文件中,一共才 100 多行,下面就逐一分析下。 WaitGroup type WaitGroup struct { noCopy noCopy state1 [3]uint32 } 其中 noCopy 是干啥的呢? 搜索一番以后发现了这样的解释: ...

2021-04-05 · Me

Golang reflect 的使用

所谓反射 (refection) 是指程序在运行过程中获取变量的类型、属性。 在 Golang 中,有时我们会看到 reflect.ValueOf() 或者 reflect.TypeOf() 这两个函数,这就是反射出一个变量的值和类型。 gPRC 的实现中也大量运用了反射。 本文主要介绍如何使用 reflect 包,关于 Go 内部是如何实现的将在下一篇文章中介绍。 TypeOf 和 ValueOf 先看一个最简单的例子 type User struct { Name string Age int } func main() { u := User{"Dick", 18} t := reflect.TypeOf(u) v := reflect.ValueOf(u) fmt.Printf("u type = %T, %v\n", u, u) fmt.Printf("t type = %T, %v\n", t, t) fmt.Printf("v type = %T, %v\n", v, v) // 获取 v 的值 // v.Age , 错误,因为 v 是 reflect.Value 类型 // 正确方式如下 v1 := v.Interface().(User) } 以上代码运行结果如下: u type = main.User, {Dick 18} t type = *reflect.rtype, main.User v type = reflect.Value, {Dick 18} 可以看出,使用 reflect.ValueOf 返回的是一个 reflect.Value 类型,需要先通过 Interface() 函数返回成一个 interface 类型,再做强制类型转换。 ...

2021-04-03 · Me

Golang io 包的实现

Golang 的 io package 包含 3 个文件 io.go, multi.go, pipe.go, 其中最主要的时 io.go。 当我们打开 io.go 的源码后,发现这个文件里面定义了大量的接口,实际上,io 包的作用就是如此 - 定义基本的 Read / Write inteface,而把具体的实现交给其他 package,比如 strings package 中就专门实现了 reader/writer,在后面的文章中再分析 strings 包。 接下来就看看 io 包中到底包含了哪些东西。 io.go 首先时定义了 4 个基础操作,读,写,关闭,seek type Reader interface { Read(p []byte) (n int, err error) } type Writer interface { Write(p []byte) (n int, err error) } type Closer interface { Close() error } type Seeker interface { Seek(offset int64, whence int) (int64, error) } 基于这 4 个基础 interface,两两组合,有扩展了下面几个 interface ReadWriter ReadCloser WriteCloser ReadWriteCloser ReadSeeker WriteSeeker ReadWriteSeeker 这块内容直接看源码即可,不再赘述。 如果使用 Goland IDE 的话,打开 struct 视图就能很清楚的看到有哪些接口,那些 public 函数。 ...

2021-04-02 · Me

Golang Context 的实现

有这样一个场景: 父 goroutine 创建了多个子 goroutine 来处理某个请求,当这些子 goroutine 中任何一个出错的时候,我们希望所有的 goroutine 都停止。 该如何实现呢? 熟悉 Go 语言的可能首先想到用 context,而 context 主要是依靠 channel 来实现以上功能。 看了一下具体的实现,主要思想是: 每种类型的 ctx 都实现了 context.Context 接口的 Done() 函数 Done() <-chan struct{} 函数返回一个只读的 channel 而且没有地方向这个channel里写数据。所以直接调用这个只读channel会被阻塞。 一般通过搭配 select 来使用。一旦 channel 关闭,就会立即读出零值。 谁来关闭这个 channel 呢? 用户主动调用返回的 CancelFunc,或者 timeout 超时 另外,在使用上配合 select 语句阻塞处理 Done() 才能起到预期的效果。 下面举两个如何使用 context 的例子,第一个例子如下 func main() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() go handle(ctx) // 等待3秒再结束(只是为了让 main 不提前 exit,与本文无关) select { case <- time.After(3*time.Second): fmt.Println("main", ctx.Err()) } } func handle(ctx context.Context) { select { case <-ctx.Done(): fmt.Println("goroutine ", ctx.Err()) // 处理请求,在这里我们用 time.After() 表示处理了多久, 也可以写成这样 // case ret, ok := <-req: case <-time.After(2*time.Second): fmt.Println("process request done") } } 在这个例子中,ctx 会在1秒后超时,而 goroutine 处理某个 request 需要 2 秒(在代码中用 time.After 代替真正的处理时间)。 ...

2021-03-28 · Me

Golang 读写锁的实现

type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers } writerSem 是写入操作的信号量 readerSem 是读操作的信号量 readerCount 是当前读操作的个数 readerWait 当前写入操作需要等待读操作解锁的个数 其中 semaphore 就是操作系统课程里面学到的信号量的概念。 读写锁的实现非常简单,源码在 /usr/local/go/src/sync/rwmutex.go 下,我们可以逐一分析它的各个函数 读者加读锁 首先是读锁,读者进入临界区之前,把 readerCount 加一, 如果这个值小于 0,则调用runtime_SemacquireMutex 把自己所在的 goroutine 挂起。 如果大于等于 0, 则加读锁成功 func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } } runtime_SemacquireMutex() 是一个运行时函数,实际调用的是在go/src/runtime/sema.go 中的函数 sync_runtime_SemacquireMutex(), 这个函数的具体实现不在本文讨论范围,目前只要知道这个函数实现了信号量的 P 操作,goroutine 在这个函数中挂起等待。 ...

2021-02-18 · Me

epoll 在 Golang net 库的使用

本文主要关注以下几个问题: Golang runtime 是怎么调用 epoll 的系统调用的 ? Golang net 库如何封装 epoll,使得开发者几乎不用直接操作 epoll ? C 如何调用 epoll 首先回顾一下用 C 语言怎么使用 epoll int s = socket(AF_INET, SOCK_STREAM, 0); bind(s...) listen(s...) int epfd = epoll_create(128); //创建eventpoll对象 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET epoll_ctl(epfd, EPOLL_CTL_ADD, s, &ev);//注册事件 //轮询就绪事件 while(true){ //返回值n为就绪的事件数,events为事件列表 int n = epoll_wait(epfd, &events[0], len(events), 1000) for( i := 0; i < n; i++ ) { ev := &events[i] //处理事件 } } C 语言中调用 epoll 的方式比较底层,总的来说分下面三个步骤 epoll_create() 创建epoll对象 epoll_ctl() 注册套接字的事件 epoll_wait() 轮询是否有事件发生,并通过events参数返回就绪(触发)的事件列表 Go 如何调用 epoll 那么在 Golang 的 runtime 最终也是调用了这么底层的 glibc 库函数吗? Golang 是如何封装 epoll 的呢? 搜索之后在 runtime/sys_linux_amd64.s 文件下面找到了 epollctl() // func epollctl(epfd, op, fd int32, ev *epollEvent) int TEXT runtime·epollctl(SB),NOSPLIT,$0 MOVL epfd+0(FP), DI MOVL op+4(FP), SI MOVL fd+8(FP), DX MOVQ ev+16(FP), R10 MOVL $SYS_epoll_ctl, AX SYSCALL MOVL AX, ret+24(FP) RET 可见,Golang runtime 最后是直接用汇编调用 系统调用 epollctl,类似的,在同一个文件下也能找到 epoll create 和 wait 的汇编代码。 ...

2021-01-31 · Me