读写锁 - sync.RWMutex #
RWMutex是Go语言中内置的一个reader/writer锁,用来解决读者-写者问题(Readers–writers problem)。在任意一时刻,一个RWMutex只能由任意数量的reader持有,或者只能由一个writer持有。
读者-写者问题 #
读者-写者问题(Readers–writers problem)描述了计算机并发处理读写数据遇到的问题,如何保证数据完整性、一致性。解决读者-写者问题需保证对于一份资源操作满足以下下条件:
- 读写互斥
- 写写互斥
- 允许多个读者同时读取
解决读者-写者问题,可以采用读者优先(readers-preference)
方案或者写者优先(writers-preference)
方案。
-
读者优先(readers-preference):读者优先是读操作优先于写操作,即使写操作提出申请资源,但只要还有读者在读取操作,就还允许其他读者继续读取操作,直到所有读者结束读取,才开始写。读优先可以提供很高的并发处理性能,但是在频繁读取的系统中,会长时间写阻塞,导致写饥饿。
-
写者优先(writers-preference):写者优先是写操作优先于读操作,如果有写者提出申请资源,在申请之前已经开始读取操作的可以继续执行读取,但是如果再有读者申请读取操作,则不能够读取,只有在所有的写者写完之后才可以读取。写者优先解决了读者优先造成写饥饿的问题。但是若在频繁写入的系统中,会长时间读阻塞,导致读饥饿。
RWMutex设计采用写者优先方法,保证写操作优先处理。
源码分析 #
下面分析的源码进行精简处理,去掉了race检查功能的代码。
RWMutex的定义 #
type RWMutex struct {
w Mutex // 互斥锁
writerSem uint32 // writers信号量
readerSem uint32 // readers信号量
readerCount int32 // reader数量
readerWait int32 // writer申请锁时候,已经申请到锁的reader的数量
}
const rwmutexMaxReaders = 1 << 30 // 最大reader数,用于反转readerCount
RLock/RUnlock的实现 #
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 { // 如果rw.readerCount为负数,说明此时已有一个writer持有锁或者正在申请锁。
runtime_SemacquireMutex(&rw.readerSem, false, 0) // 此时reader休眠阻塞在readerSem信号上,等待唤醒
}
}
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // r小于0说明此时有等待请求锁的writer
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders { // RLock之前已经进行了RUnlock操作
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 此时是最后一个获取到锁的reader进行RUnlock操作,那么释放writerSem信号,唤醒等待的writer来获取锁。
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
Lock/Unlock的实现 #
func (rw *RWMutex) Lock() {
rw.w.Lock() // 加互斥锁,阻塞其他writer进行Lock操作,保证写-写互斥。
// 将rw.readerCount 更改为rw.readerCount - rwmutexMaxReaders,
// 此时rw.readerCount由一个正数转变成一个负数,这种方式既能保持记录reader数量,又能表明有writer正在请求锁
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // r!=0表明此时有reader持有锁,则当前writer只能阻塞等待,但为了保证写优先,需要readerWait记录当前已获取到锁的读者数量
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders { // Lock之前先进行了Unlock操作
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ { // 释放信号,唤醒阻塞的reader们
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock() // 是否互锁锁,允许其他writer进行获取锁操作了
}
对于读者优先(readers-preference)
的读写锁,只需要一个readerCount
记录所有读者,就可以轻易实现。Go中的RWMutex实现的是写者优先(writers-preference)
的读写锁,那就需要用到readerWait
来记录写者申请锁时候,已经获取到锁的读者数量。
这样当后续有其他读者继续申请锁时候,可以读取readerWait
是否大于0,大于0则说明有写者已经申请锁了,按照写者优先(writers-preference)
原则,该读者需要排到写者之后,但是我们还需要记录这些排在写者后面读者的数量呀,毕竟写着将来释放锁的时候,还得唤醒一个个这些读者。这种情况下既要读取readerWait
,又要更新排队的读者数量,这是两个操作,无法原子化。RWMutex在实现时候,通过将readerCount转换成负数,一方面表明有写者申请了锁,另一方面readerCount还可以继续记录排队的读者数量,解决刚描述的无法原子化的问题,真是巧妙!
对于读者优先(readers-preference)
的读写锁,我们可以借助Mutex实现。示例代码如下:
type rwlock struct {
reader_cnt int
reader_lock sync.Mutex
writer_lock sync.Mutex
}
func NewRWLock() *rwlock {
return &rwlock{}
}
func (l *rwlock) RLock() {
l.reader_lock.Lock()
defer l.reader_lock.Unlock()
l.reader_cnt++
if l.reader_cnt == 1 { // first reader
l.writer_lock.Lock()
}
}
func (l *rwlock) RUnlock() {
l.reader_lock.Lock()
defer l.reader_lock.Unlock()
l.reader_cnt--
if l.reader_cnt == 0 { // latest reader
l.writer_lock.Unlock()
}
}
func (l *rwlock) Lock() {
l.writer_lock.Lock()
}
func (l *rwlock) Unlock() {
l.writer_lock.Unlock()
}
上面示例代码中,尽管读者操作的实现上用到互斥锁,但由于它是用完立马就是释放掉,性能不会差太多。
三大错误使用场景 #
RLock/RUnlock、Lock/Unlock未成对出现 #
同互斥锁一样,sync.RWMutex的RLock/RUnlock,以及Lock/Unlock总是成对出现的。Lock或RLock多余调用会导致锁没有释放,可能出现死锁,Unlock或RUnlock多余的调用会大导致panic.
func main() {
var l sync.RWMutex
l.Lock()
l.Unlock()
l.Unlock() // fatal error: sync: Unlock of unlocked RWMutex
}
对于Lock/Unlock未成对出现所有可能情况如下:
-
如果只有Lock情况
如果有一个 goroutine 只执行 Lock 操作而不执行 Unlock 操作,那么其他的 goroutine 就会一直被阻塞(拿不到锁),随着越来越多的阻塞的 goroutine 越来越多,整个系统最终会崩溃。
-
如果只有Unlock情况
- 如果其他 goroutine 持有锁,锁将被释放。
- 如果锁处于空闲状态(unoccupied state),它会panic。
复制sync.RWMutex作为函数值传递 #
同Mutex一样,RWMutex也是不能复制使用的,考虑下面场景代码:
func main() {
var l sync.RWMutex
l.Lock()
foo(l)
l.Lock()
l.Unlock()
}
func foo(l sync.RWMutex) {
l.Unlock()
}
上面场景代码中本意先使用l.Lock()进行上锁操作,然后调用foo(l)释放该锁,最后再次上锁和释放锁。但这种操作是错误的,会导致死锁。foo()函数接收的参数是变量l的一个副本,该副本把之前l变量的锁状态(锁状态指的是writerSem,readerCount等字段信息)也复制了一遍,此时副本的锁状态是上锁状态的,所以foo函数中是可以进行释放锁操作的,但释放的并不是最开始的那个锁。
我们可以使用go vet
命令检测复制锁情况:
vagrant@vagrant:~$ go vet main.go
# command-line-arguments
./main.go:8:6: call of foo copies lock value: sync.RWMutex
./main.go:13:12: foo passes lock by value: sync.RWMutex
解决上面问题可以使用指针传递:
func foo(l *sync.RWMutex) {
l.Unlock()
}
不可重入导致死锁 #
可重入锁(ReentrantLock)指的一个线程中可以多次获取同一把锁,换到Go语言场景就是一个Goroutine中,Mutex和RWMutex可以连续Lock操作,而不会导致死锁。同互斥体Mutex一样,RWMutex也是不可重入锁,不支持重入。
func main() {
var l sync.RWMutex
l.Lock()
foo(&l) // foo中尝试重入锁,会导致死锁
l.Unlock()
}
func foo(l *sync.RWMutex) {
l.Lock()
l.Unlock()
}
下面是读锁和写锁重入时候导致的死锁:
func main() {
var l sync.RWMutex
l.RLock()
foo(&l)
l.RUnlock()
}
func foo(l *sync.RWMutex) {
l.Lock()
l.Unlock()
}
上面代码中写锁重入时候,需要读锁先释放,而读锁释放又依赖写锁,这样就形成了死循环,导致死锁。