type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}
  • writerSem 是写入操作的信号量
  • readerSem 是读操作的信号量
  • readerCount 是当前读操作的个数
  • readerWait 当前写入操作需要等待读操作解锁的个数

其中 semaphore 就是操作系统课程里面学到的信号量的概念。

读写锁的实现非常简单,源码在 /usr/local/go/src/sync/rwmutex.go 下,我们可以逐一分析它的各个函数

读者加读锁

首先是读锁,读者进入临界区之前,把 readerCount 加一,

  • 如果这个值小于 0,则调用runtime_SemacquireMutex 把自己所在的 goroutine 挂起。
  • 如果大于等于 0, 则加读锁成功
func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

runtime_SemacquireMutex() 是一个运行时函数,实际调用的是在go/src/runtime/sema.go 中的函数 sync_runtime_SemacquireMutex(), 这个函数的具体实现不在本文讨论范围,目前只要知道这个函数实现了信号量的 P 操作,goroutine 在这个函数中挂起等待。

写者加写锁

另一个问题是,readerCount 加 1 怎么会小于 0 呢? 那就要看写者加锁的函数了

func (rw *RWMutex) Lock() {
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

当写者要获取锁时,在第5行把 readerCount 变成一个非常大的负数,结果就是 readerCount变成负数后,表示写者开始申请锁,从这个时候开始,后来的读者 RLock() 都会被挂起

r 其实就是 readerCount的值,在第7行赋值给了 readerWait,表示写者要等待这么多个读者之后才能真正运行,第8行,写者把自己挂起。

读者释放读锁

接下来看读者释放锁。

func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

当之前的读者完成工作之后,调用RUnlock()释放读锁,如果这个时候 readerCount 小于0,表示有 写者在等待,所以在rUnlockSlow() 中, 把readerWait 的值减一,如果此时 readerWait 为0,表示写者要等待前面的 n 个读者都完成了,此时,调用runtime_Semrelease() 唤醒写者。

写者释放写锁

写者释放写锁的任务就是唤醒正在等待的读者,并且把 readerCount 的值恢复成整数。

func (rw *RWMutex) Unlock() {
	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
}

参考资料