有这样一个场景: 父 goroutine 创建了多个子 goroutine 来处理某个请求,当这些子 goroutine 中任何一个出错的时候,我们希望所有的 goroutine 都停止。 该如何实现呢?

熟悉 Go 语言的可能首先想到用 context,而 context 主要是依靠 channel 来实现以上功能。

看了一下具体的实现,主要思想是:

  1. 每种类型的 ctx 都实现了 context.Context 接口的 Done() 函数
  2. Done() <-chan struct{} 函数返回一个只读的 channel 而且没有地方向这个channel里写数据。所以直接调用这个只读channel会被阻塞。
  3. 一般通过搭配 select 来使用。一旦 channel 关闭,就会立即读出零值。
  4. 谁来关闭这个 channel 呢? 用户主动调用返回的 CancelFunc,或者 timeout 超时

另外,在使用上配合 select 语句阻塞处理 Done() 才能起到预期的效果。

下面举两个如何使用 context 的例子,第一个例子如下

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	go handle(ctx)

    // 等待3秒再结束(只是为了让 main 不提前 exit,与本文无关)
	select {
	case <- time.After(3*time.Second):
		fmt.Println("main", ctx.Err())
	}
}

func handle(ctx context.Context) {
	select {
	case <-ctx.Done():
		fmt.Println("goroutine ", ctx.Err())

    // 处理请求,在这里我们用 time.After() 表示处理了多久, 也可以写成这样
    // case ret, ok := <-req:

	case <-time.After(2*time.Second):
		fmt.Println("process request done")
	}
}

在这个例子中,ctx 会在1秒后超时,而 goroutine 处理某个 request 需要 2 秒(在代码中用 time.After 代替真正的处理时间)。

因此这个程序运行时,context 总是会先超时,context 内部实现会导致 Done 的 channel 被关闭,因此 select 语句中的 case <- ctx.Done()执行。

另外,既然 WithTimeout() 会自动定时取消了,为什么还要 defer cancel() 呢?

https://golang.org/src/context/example_test.go 有这样一个解释

// Even though ctx will be expired, it is good practice to call its
// cancellation function in any case. Failure to do so may keep the
// context and its parent alive longer than necessary.

第二个例子如下,展示如何在子 goroutine 中使用 ctx

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	go handle(ctx)

    // 等待3秒再结束(只是为了让 main 不提前 exit,与本文无关)
	select {
	case <- time.After(3*time.Second):
		fmt.Println("main", ctx.Err())
	}
}

func handle(ctx context.Context) {
	// 检查 context 是否已经超时
	select {
	case <-ctx.Done():
		fmt.Println("goroutine ", ctx.Err())
		return 

	default:
		fmt.Println("no timeout, continue")
	}

	// 继续执行下面的代码
	fmt.Println("goroutine finish")
}

第二个例子主要思想是在函数一开始用 select 语句判断 context 是否超时,其中关键是 default case,代表默认情况下程序的执行路径。

以上就是 context 常见使用方式,另外 context 也可以带上 value 传递给子 goroutine,这里暂不介绍。

接下来主要分析一下 context 包中几个重要的函数,看看 context 内部如何实现超时功能。

Context Interface

context package 中 emptyCtx,cancelCtx,timerCtx 都实现了该接口。

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key interface{}) interface{}
}

emptyCtx

emptyCtx 是 context package 中最简单的 context 类型,等于什么也不做

我们通常用 context.Background() 或者 context.TODO(), 就是直接返回 emptyCtx。

值得注意的是,background 和 todo 是静态变量,也就是说 go 程序中只有一份。

cancelCtx

当我们调用 context.WithTimeout() 时返回的是一个 timerCtx 结构体,而 cancelCtx 则是实现 timerCtx 的基础,因此分析它的实现。

通常调用 context.WithCancel() 函数创建一个新的 context,并返回一个 cancel 函数,用户决定何时调用 cancel 函数

当用户调用 cancel() 时,所有的子 goroutine 都被关闭。

其结构体定义如下

type cancelCtx struct {
  Context
  mu       sync.Mutex

  // cancel 的时候,close 这个chan
  done     chan struct{}

  // 派生的context
  children map[canceler]struct{}
  err      error
}

当用户调用 context.WithCancel() 函数时,内部主要做了两件事:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)  // 新的上下文

	propagateCancel(parent, &c) // 构建父子上下文之间的关联

	return &c, func() { c.cancel(true, Canceled) }
}

其中 propagateCancel() 的作用是在 parent 和 child 之间同步取消和结束的信号,保证在 parent 被取消时,child 也会收到对应的信号,不会出现状态不一致的情况。

函数实现如下,注意: parent 是我们传入的 ctx,child 才是这个 ctx 自己

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}

	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

    //  判断 parent 是哪种 context 类型,cancelCtx or timerCtx
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		atomic.AddInt32(&goroutines, +1)

		// 启动goroutine,等待 parent/child Done
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

parentCancelCtx() 判断 parent 是哪种 context 类型,在新的 go 源码中该函数已经做了改动,变得比以前复杂。

贴一个早期的版本,更加易懂:

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    for {
        switch c := parent.(type) {
        case *cancelCtx:
            return c, true    // 找到最近支持cancel的parent,由parent进行取消操作的调用
        case *timerCtx:
            return &c.cancelCtx, true // 找到最近支持cancel的parent,由parent进行取消操作的调用
        case *valueCtx:
            parent = c.Context // 递归
        default:
            return nil, false
        }
    }
}

从 parentCancelCtx 函数开始,propagateCancel 的逻辑如下:

当 child 的类型属于某个可 cancel 的 ctx (cancelCtx,timerCtx,或 valueCtx ),会判断 parent 是否已经触发了取消信号;

  1. 如果已经被取消,child 会立刻被取消;
  2. 如果没有被取消,child 会被加入 parent 的 children 列表中,等待 parent 释放取消信号;

当父 ctx 是开发者自定义的类型(parentCancelCtx 返回 false)时;

  1. 运行一个新的 Goroutine 同时监听 parent.Done() 和 child.Done() 两个 Channel;
  2. 在 parent.Done() 关闭时调用 child.cancel 取消子上下文;

Done 函数的实现比较简单, 只是返回一个chan 。

可以看出 Done 操作是在调用时才会构造 <-chan,done 变量是延时初始化的。

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

最后就是 cancel() 函数,顾名思义,这个函数的实现也很简单,就不贴出源码了。

timerCtx

从本文开头的例子也能看出这个是我们最常用的 context 类型。

有了 cancelCtx 的基础之后,再来看 timerCtx,最大的区别就是添加了 timer,

context.WithTimeout() 函数内部是直接调用了 WithDeadline(),在理解了 cancelCtx 之后看这部分的源码会简单的多,不对每个函数逐一分析了,读者可以直接查看源码。

比较有意思的一段代码,是用了 time.AfterFunc() 构造一个定时器,当超时时执行自定义的 Func(这里就是 cancel 函数)

c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})

其他

Go语言语法

p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)

可以看出上面是把值转换成了 (*cancelCTX) 类型,通常我们只用一个返回值的方式,为什么这里可以有两个返回值呢?

这是由编译器特殊处理的,当返回两个值的时候,ok 代表能否转换成功。

参考资料