当处理并发时,锁是最常用的编程语言工具,对于大多数应用而言,仅在使用数据时加锁,使用后释放即可。然而,对于一些对性能要求比较高的并发实现,锁的使用方式需要作出对应的优化。例如锁的 粒度 经常用于确保线程在尽可能短的时间内独占。

此外,使用锁时一个常用优化方法是使用 读写锁 ,本文即探讨读写锁的原理并给出相关实现。所有的代码都可以通过 on GitHub访问。同时,文章的结尾给出了性能测试相关的方法。

本文的撰写目的是为了研究探讨读写锁,如果需要在代码中使用读写锁,Go语言本身已经内置了一个高性能的读写锁:sync.RWMutex。实际上,本文也将探索它的实现。

读写锁产生的动因

读写锁产生的动因是为了在无线程写数据的情况下,多个线程能够并发地共享该数据。普通锁并不会区分读线程或者写线程,因此当多个线程需要读取数据时,每个线程都必须先要对该数据上锁,浪费了大量资源。

读写锁相对于普通锁的单个 lock 方法,它有两个方法,一个用于读者、一个用于写者。只要写者没有持有锁,多个读者即可共享数据。

简单实现

下面实现一个简单的读写锁,使用counter记录读者数量。本文接下来的所有读写锁实现都会实现该接口:

1
2
3
4
5
6
type RWLocker interface {
RLock() // 读者获取锁
RUnlock() // 读者释放锁
WLock() // 写者获取锁
WUnlock() // 写者释放锁
}

对应锁的结构为:

1
2
3
4
type ReaderCountRWLock struct {
m sync.Mutex
readerCount int
}

counter用于记录持有锁的读者数量,读者获取锁、释放锁的逻辑比较简单:

1
2
3
4
5
6
7
8
9
10
11
func (l *ReaderCountRWLock) RLock() {
l.m.Lock()
l.readerCount++
l.m.Unlock()
}
func (l *ReaderCountRWLock) RUnlock() {
l.m.Lock()
l.readerCount--
l.m.Unlock()
}

写者获取锁与释放锁的实现则复杂一些,写者在获取锁时必须等待持有锁的读者全部释放锁,代码如下:

1
2
3
4
5
6
7
8
9
10
func (l *ReaderCountRWLock) WLock() {
for {
l.m.Lock()
if l.readerCount > 0 {
l.m.Unlock()
} else {
break
}
}
}

写者对mutex上锁,同时检查是否存在读者持有锁,如果存在,写者释放mutexx并再次尝试,这也叫 自旋 。如果没有持有锁的读者,WLock将会结束,同时不释放锁,从而读者将无法再持有锁,写者释放锁的逻辑则比较直观:

1
2
3
func (l *ReaderCountRWLock) WUnlock() {
l.m.Unlock()
}

这样的实现方法是我目前能够想到的最简单的方法,它的性能有待优化,例如:当读者持有锁时,写者获取锁的实现会持续自旋,不断的获取锁与释放锁这一过程对CPU的计算能力来说是一种额外的消耗。如果我们能够使写者更高效地等待,那读写锁的性能也将获得提升。

使用条件变量实现高效等待

条件变量的模式是“获取mutex,检查,如果未准备好则释放mutex”,而这正是我们需要的。因此改进后的读写锁实现是:

1
2
3
4
type ReaderCountCondRWLock struct {
readerCount int
c *sync.Cond
}

如果你在好奇读写锁中的mutex在哪里,不用担心,Go中,sync.Cond 内嵌一个mutex,这个结构体需要一个构造函数:

1
2
3
func NewReaderCountCondRWLock() *ReaderCountCondRWLock {
return &ReaderCountCondRWLock{0, sync.NewCond(new(sync.Mutex))}
}

下面是读者获取锁、释放锁的相关实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (l *ReaderCountCondRWLock) RLock() {
l.c.L.Lock()
l.readerCount++
l.c.L.Unlock()
}
func (l *ReaderCountCondRWLock) RUnlock() {
l.c.L.Lock()
l.readerCount--
if l.readerCount == 0 {
l.c.Signal()
}
l.c.L.Unlock()
}

与上一版读写锁实现的区别仅在于这里在最后一个读者释放锁的时候会在条件变量上发出信号。写者获取锁的实现方式为:

1
2
3
4
5
6
func (l *ReaderCountCondRWLock) WLock() {
l.c.L.Lock()
for l.readerCount > 0 {
l.c.Wait()
}
}

这里的Wait过程仍然处于循环中,因为极有可能在读者发出信号之后、写者获取锁之前,另一个读者先拿到锁。

写者释放锁的过程:

1
2
3
4
func (l *ReaderCountCondRWLock) WUnlock() {
l.c.Signal()
l.c.L.Unlock()
}

你能理解Signal为什么是需要的吗?缺少它,在特定情况下将会导致死锁:如果两个写者都在等待条件变量,在其中一个获取锁后,另一个将会继续等待,在缺少后续读者发送信号的情况下,这个写者将会永久等待,因此有必要在写着释放锁的时候发出信号。

这一实现比上一版的视线更为高效,因为避免了自旋的过程,尽管这里仍然又一个循环,但它会在Wait执行时阻塞,仅当竞争时才会再次执行循环。

需要注意的是,这里提到的“更高效”并没有相应的数据支撑,因为对上述代码执行性能测试过于复杂,且性能会随着测试样例的变化而变化。后文将会针对性能测试给出说明。

Counted 信号量

另一种实现读写锁的优雅方式时通过 counting 信号量 实现。在Go中,golang.org/x/sync/semaphore包提供了相关的实现。构造方法与读者获取锁、释放锁的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const maxWeight int64 = 1 << 30
type SemaRWLock struct {
s *semaphore.Weighted
}
// NewSemaRWLock creates a new SemaRWLock.
func NewSemaRWLock() *SemaRWLock {
return &SemaRWLock{semaphore.NewWeighted(maxWeight)}
}
func (l *SemaRWLock) RLock() {
l.s.Acquire(context.Background(), 1)
}
func (l *SemaRWLock) RUnlock() {
l.s.Release(1)
}

写者仅仅是获取整个maxWeight, 从而保证仅有一个写锁能够获取信号量。

1
2
3
4
5
6
7
func (l *SemaRWLock) WLock() {
l.s.Acquire(context.Background(), maxWeight)
}
func (l *SemaRWLock) WUnlock() {
l.s.Release(maxWeight)
}

这种实现方式非常简洁,在实际测试时,可能是因为 semaphore.Weighted 不适合特定的测试样例,这个方法的性能甚至还没有第一版实现的性能好。

读者优先 vs. 写者优先

上述的三种实现都存在一个问题:当读者数量很大时,可能会导致写者饥饿。例如,第一版实现中 readerCount 为0时,写者才能够获取锁,假设有两个活跃的读者以及一个等待的写者,在写者等待一个读者释放锁的过程中,另一个读者可能又会获取锁

这也被叫做 读者优先, 或者 写者饥饿 。下面我们将实现写者优先的相关逻辑。

简单的写者优先读写锁

实现源于 Wikipedia。首先,数据结构与构造函数如下:

1
2
3
4
5
6
7
8
9
type WritePreferRWLock struct {
readerCount int
hasWriter bool
c *sync.Cond
}
func NewWritePreferRWLock() *WritePreferRWLock {
return &WritePreferRWLock{0, false, sync.NewCond(new(sync.Mutex))}
}

这里的 readerCount 仍然指代持有锁的读者数量, 但我们添加了一个新的字段 - hasWriter; 当有写者等待获取锁的时候,它的值为true。读者获取锁与释放锁的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (l *WritePreferRWLock) RLock() {
l.c.L.Lock()
for l.hasWriter {
l.c.Wait()
}
l.readerCount++
l.c.L.Unlock()
}
func (l *WritePreferRWLock) RUnlock() {
l.c.L.Lock()
l.readerCount--
if l.readerCount == 0 {
l.c.Signal()
}
l.c.L.Unlock()
}

当读者尝试获取锁时,会首先检查是否有写者等待获取锁,如果有的话将会让出获取锁的权限,保证写者先获取锁。

在缩放锁的时候,最后一个读者会发出信号,触发在等待的线程执行。

写者获取锁、释放锁的相关实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (l *WritePreferRWLock) WLock() {
l.c.L.Lock()
for l.hasWriter {
l.c.Wait()
}
l.hasWriter = true
for l.readerCount > 0 {
l.c.Wait()
}
l.c.L.Unlock()
}
func (l *WritePreferRWLock) WUnlock() {
l.c.L.Lock()
l.hasWriter = false
l.c.Broadcast()
l.c.L.Unlock()
}

写者将会首先检查是否有其他写者在等待,不同于之前的实现,这里写者在WLock与WUnlock之间不再持有mutex,取而代之,mutex仅用于控制共享结构的访问,hasWriter 字段不仅表示有写者在等待锁,也表示写者正在使用锁,当hashWriter为true时,不再有新的读者能够获取到锁。

在 WUnlock 中, 写者将 hasWriter 置为 false,同时广播信号。这里使用Broadcast而不是Signal是因为可能存在多个读者等待,而我们期望唤醒所有等待的读者。

更高效的写者优先读写锁实现

上述的实现都比较简洁,但性能表现却不够好。因此,这里我们研究下Go自身读写锁的实现。

实现的总体目标是使读者能够尽快获取锁,同时要保证写优先。

首先是结构体与构造方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type WritePreferFastRWLock struct {
w sync.Mutex
writerWait chan struct{}
readerWait chan struct{}
numPending int32 // 尝试持有锁或已经持有锁的读者数量
readersDeparting int32 // 在写者持有锁之前获取锁的读者数量(读者释放锁,也会随之减1)
}
const maxReaders int32 = 1 << 30
func NewWritePreferFastRWLock() *WritePreferFastRWLock {
var l WritePreferFastRWLock
l.writerWait = make(chan struct{})
l.readerWait = make(chan struct{})
return &l
}

mutex w不会被读者所使用,它存在的作用是为写者提供互斥,稍后再介绍它的具体用法。numPending用于表示有多少个读者持有锁(类似上文中的readerCount),但它同时也被写者使用。写者将该字段减去maxReaders,如果得到一个负数就表明一个写者正在使用锁,通过原子操作访问该字段,因此不再需要锁。
下面介绍读者相关方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (l *WritePreferFastRWLock) RLock() {
if atomic.AddInt32(&l.numPending, 1) < 0 {
<-l.readerWait
}
}
func (l *WritePreferFastRWLock) RUnlock() {
if r := atomic.AddInt32(&l.numPending, -1); r < 0 {
if atomic.AddInt32(&l.readersDeparting, -1) == 0 {
l.writerWait <- struct{}{}
}
}
}

读者获取锁时会对numPending加1,如果numPending是非负数,表明没有写者等待持有锁或正持有锁,因此读者可以继续操作。读者获取锁的方法执行极其频繁,而这里只是用了一个原子操作、一个条件判断,因此速度极快。

如果numPending是负数,表明写者正在等待获取锁或已经获取锁,因此读者将会让出权限,保持等待,这个操作是通过在一个无缓冲channel上等待实现的。

当读者释放锁时,它将numPending减1,如果没有写者等待,执行结束;如果有写者等待,readersDeparting字段用于告知写者一个读者已经释放锁。

写者获取锁:

1
2
3
4
5
6
7
func (l *WritePreferFastRWLock) WLock() {
l.w.Lock()
r := atomic.AddInt32(&l.numPending, -maxReaders) + maxReaders
if r != 0 && atomic.AddInt32(&l.readersDeparting, r) != 0 {
<-l.writerWait
}
}

w mutex用于保证仅有一个写者在任意时间内获取锁。第二行复杂一些,它完成两件事:

  1. 通过执行numPending减maxReaders告知读者存在一个写者等待

  2. 通过执行加maxReaders操作,获取有多少个尝试持有锁或持有锁的读者

接下来,如果有任意数量尝试持有锁或持有锁的读者(r != 0),它将对应数值加入readersDeparting中,——这让读者在解除写者等待前知道有多少个尝试持有锁或持有锁的读者,当最后一个持有锁的读者释放时,函数将返回(写者持有锁),其他写者因为w而陷入等待,而读者则等待numPending变为非负数才能获取锁。

写者释放锁:

1
2
3
4
5
6
7
func (l *WritePreferFastRWLock) WUnlock() {
r := atomic.AddInt32(&l.numPending, maxReaders)
for i := 0; i < int(r); i++ {
l.readerWait <- struct{}{}
}
l.w.Unlock()
}

再一次,通过加maxReaders,写者能够告诉未来的读者已经没有写者还处于等待获取锁的过程中了。r表示等待获取锁的读者数量,写者通过发送r个空对象到readerWait中唤醒所有等待的读者,最后对w解锁,给予后续写者上锁的机会。

这里主要是readersDeparting不太好理解,为了更好的理解它,假设有如下操作:

  1. 两个读者获取锁(numPending = 2,readersDeparting = 0)
  2. 一个写者获取锁(numPending = 2 - MAX, readersDeparting = 2)
  3. 一个读者获取锁(numPending = 3 - MAX, readersDeparting = 2)

可见,readersDeparting主要用于记录写者上锁前就已经持有锁的读者数量,后续这些这些读者释放锁时,readersDeparting也会随之减少,当它变为0时,恰好说明写者之前获取锁的所有读者均已经释放锁。

后续: 性能测试方法

性能测试相关方法的代码位于GitHub。它的目的是测试正确性,因此它检查多个读者与写者执行的正确性。在执行性能测试时,会记录每个读者与写者获取锁的等待时间,最后打印平均时间。

原文链接

Implementing reader-writer locks