type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
- writerSem 是写入操作的信号量
- readerSem 是读操作的信号量
- readerCount 是当前读操作的个数
- readerWait 当前写入操作需要等待读操作解锁的个数
其中 semaphore 就是操作系统课程里面学到的信号量的概念。
读写锁的实现非常简单,源码在 /usr/local/go/src/sync/rwmutex.go
下,我们可以逐一分析它的各个函数
读者加读锁
首先是读锁,读者进入临界区之前,把 readerCount 加一,
- 如果这个值小于 0,则调用
runtime_SemacquireMutex
把自己所在的 goroutine 挂起。 - 如果大于等于 0, 则加读锁成功
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
runtime_SemacquireMutex()
是一个运行时函数,实际调用的是在go/src/runtime/sema.go
中的函数 sync_runtime_SemacquireMutex()
,
这个函数的具体实现不在本文讨论范围,目前只要知道这个函数实现了信号量的 P 操作
,goroutine 在这个函数中挂起等待。
写者加写锁
另一个问题是,readerCount
加 1 怎么会小于 0 呢? 那就要看写者加锁
的函数了
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
当写者要获取锁时,在第5行把 readerCount 变成一个非常大的负数,结果就是 readerCount变成负数后,表示写者开始申请锁,从这个时候开始,后来的读者 RLock() 都会被挂起
r 其实就是 readerCount的值,在第7行赋值给了 readerWait
,表示写者要等待这么多个读者之后才能真正运行,第8行,写者把自己挂起。
读者释放读锁
接下来看读者释放锁。
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
当之前的读者完成工作之后,调用RUnlock()
释放读锁,如果这个时候 readerCount
小于0,表示有 写者在等待,所以在rUnlockSlow()
中,
把readerWait
的值减一,如果此时 readerWait
为0,表示写者要等待前面的 n 个读者都完成了,此时,调用runtime_Semrelease()
唤醒写者。
写者释放写锁
写者释放写锁的任务就是唤醒正在等待的读者,并且把 readerCount
的值恢复成整数。
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}