Golang 锁的机制

在说锁的时候主要是有两种锁,一类是乐观锁,还有一类是悲观锁。

  • 悲观锁:默认 数据冲突时常发生 ,所以在操作之前就加锁,阻止其他线程同时访问该资源。常见场景是写多读少的情况,对数据一致性要求比较高的情况。

  • 乐观锁:默认 数据冲突不常发生 ,所有直接操作,最后再通过版本号 / CAS 验证是否冲突,如果冲突了再重试。一般是适用于写少读少的情况,在线文档就是一种很好的案例。

我之前背八股文的时候认为 CAS 和 版本号机制是一种东西,但是好像不是那么一样。本质上,版本号机制是对于 CAS 机制的补充,是为了解决 ABA 问题。

CAS(compare and swap)

CAS(比较并交换)是一种原子操作,在这个机制中有三个参数:
1. 主内存中存放的共享变量的值:V(一般来说都是内存的地址,通过这个地址可以获得对应的值)
2. 工作内存中共享变量的副本值,也叫做预期值:A
3. 需要将共享变量更新到最新值:B

主存中保存V值,线程中要使用V值要先从主存中读取V值到线程的工作内存A中,然后计算后变成B值,最后再把B值写回到内存V值中。

语义大概如下:

1
2
3
4
5
6
if (*addr == old) {
*addr = new
return true
} else {
return false
}

如果 addr 为旧的地址,那么就直接更新地址,否则返回 false。在 Golang 中 sync/atmoic 中就提供了原子的 CAS 操作,如 atomic.CompareAndSwapInt32。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Mutex struct {
key int32
}

func (m *Mutex) Lock() {
for {
if atomic.CompareAndSwapInt32(&m.key, 0, 1) {
return
}
}
}

func (m *Mutex) Unlock() {
atomic.CompareAndSwapInt32(&m.key, 1, 0)
}

优缺点

缺点

  1. ABA 问题:也就是说,值更新的流程如下:
    $$A -> B -> A$$
    那么无法得知值是否被修改,无法通过值判断是否进行了修改。因此,这里引入了版本号机制,也就是将以上流程改变为:
    $$1A -> 2B -> 3A $$
    因此每个流程就改变为两个属性,分别表示版本号和值。
  2. 资源消耗高:看起来CAS比锁的效率高,从阻塞机制变成了非阻塞机制,减少了线程之间等待的时间。每个方法不能绝对的比另一个好,在线程之间竞争程度大的时候,如果使用CAS,每次都有很多的线程在竞争,也就是说CAS机制不能更新成功。这种情况下CAS机制会一直重试,这样就会比较耗费CPU。

优点

  1. 可以保证变量操作的原子性。
  2. 在低并发的情况下,使用 CAS 机制比使用锁的机制效率更高。
  3. 在线程对共享资源占用率比较低的情况下,使用 CAS 机制效率也比较高。

Mutex 锁

golang 中的锁机制已经优化了几代,优化原因分别是:

  • 自旋锁的性能问题
  • 信号量的 FIFO 效率问题
  • 惊群问题
  • 多次请求,多给机会
  • 饥饿问题

简单的 Mutex 机制

Go 早期的 sync.Mutex 是普通的互斥锁,本质上是 自旋 + 睡眠 两阶段处理方式。比如说,当一个 goroutine 尝试获取锁的时候,首先会先尝试自旋,如果尝试一段时间还未成功,那么就会进入到睡眠阻塞队列。但是,这在高并发的情况下会造成大量的调度器切换,拉低性能。

信号量机制

由于自旋锁的性能问题,因此加入了信号量来唤醒 Goroutine,同时利用信号量的队列来保证先来先到的,保证其公平性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

package func cas(val *int32, old, new int32) bool

export type Mutex struct {
key int32;
sema int32;
}

func xadd(val *int32, delta int32) (new int32) {
for {
v := *val;
if cas(val, v, v+delta) {
return v+delta;
}
}
panic("unreached")
}

func (m *Mutex) Lock() {
if xadd(&m.key, 1) == 1 {
// changed from 0 to 1; we hold lock
return;
}
sys.semacquire(&m.sema);
}

func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 {
// changed from 1 to 0; no contention
return;
}
sys.semrelease(&m.sema);
}

加入 runtime_Semacquire 和 runtime_Semrelease 信号量,支持阻塞等待。

相互比较来说是解决了 for 重复循环消耗 CPU 资源的问题,如果竞争不激烈可以直接通过原子加减快速完成。但是,如果竞争十分激烈,那么就会出现阻塞等待的情况,并且只是依靠信号量的队列,那么就只能依靠信号量来进行调度(FIFO),这种 FIFO 工作效率低,会拉低性能

在很多帖子中都有这么一个有关性能的案例:

假设有三个协程分别是G1,G2,G3。G1首先加锁成功了,然后执行业务逻辑。期间G2想加锁发现加不上,就进入了信号量的等待队列,这个时候G2可能已经被调度器从M上调走了。然后G1解锁,这个时候G3想加锁发现由于G2在他前面进行了等待,所以导致G3加不上。这种情况由于G2没有获得CPU时间片,但是G3已经获得了CPU时间片,所以直接把锁给G3从整体上来说,效率会更加高些

看着一大段其实想要说明是这么一个问题,当前 G2,G3 都在等待锁释放,但是此时如果 G2 被挂起,或者被移除这个线程,但是在这个队列中必须等待 G2 出队之后才能让 G3 获得锁,因此导致性能拉低。理想情况就是直接给 G3 锁,这样才会提高性能。

解决信号量 FIFO 问题 // 惊群效应

新人也有机会进阶

为了提高性能,我们想要让所有的新来的运行态的 Goroutine 都有一次机会去尝试获取锁。因此可以改进以上代码,下面是我看到的一篇公众号的代码,可以简单的概括以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Mutex struct {
key int32
sema uint32
}

func (m *Mutex) Lock() {
// func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
if atomic.CompareAndSwapInt32(&m.key, 0, 1) {
return
}
// 在这里开始尝试获取一次锁,然后等待
for {
if atomic.CompareAndSwapInt32(&m.key, 0, 1) {
return
}
runtime.Semacquire(&m.sema)
}
}

func (m *Mutex) Unlock() {
if atomic.CompareAndSwapInt32(&m.key, 1, 0) {
runtime.Semrelease(&m.sema)
}
}

但是,以上又会产生惊群效应。

所谓惊群效应,就是多个进程或者线程在等待同一个事件,当事件发生时,所有进程或者线程都会被内核唤醒。然后,通常只有一个进程获得了该事件,并进行处理;其他进程在发现获取事件失败后,又继续进入了等待状态。这在一定程度上降低了系统性能。

参考别人给的案例:假设先有三个协程G1,G2,G3。G1加锁成功,G2,G3进入了信号量的等待队列。然后G1解锁,G2被唤醒,这个时候来了个新协程G4,G2和G4一起竞争锁,G4竞争成功。然后G4很快执行完进行解锁,然后G3就被唤醒了,这个时候就存在G2,G3一起竞争锁的场景。如果在高并发场景,这样的唤醒竞争还会更加激烈。同时也违背了G2,G3在信号量上先来先得的设计。

惊群问题

因此,要解决以上的问题,需要满足以下三种情况:

  • 唤醒指定 goroutine
  • 知道当前有多少协程阻塞在信号量上
  • 避免惊群效应

因此 Golang 中把他进行了优化,结构体由原来的 key 变为了 state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}

const (
mutexLocked = 1 << iota // 表示互斥锁已被某个 goroutine 占用
mutexWoken // 标记互斥锁的等待队列中的 goroutine 已被唤醒(避免重复唤醒)
mutexWaiterShift = iota // 互斥锁内部使用一个整型变量,高位存储等待锁的 goroutine 数量,需要左移2位
)

为了优化,state 的高低位具有不同的功能。
alt text

那么,mutex 的代码就变成了 (开始看不懂了,对位运算不太熟):

&^ 是一个位清除运算符(bit clear operator,它的作用是将第一个操作数(左操作数)中的位与第二个操作数(右操作数)相对应的位进行比较。如果右操作数的位为 1,则将左操作数的相应位清零(设置为 0)。如果右操作数的位为 0,则左操作数的相应位保持不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package sync provides basic synchronization primitives such as mutual
// exclusion locks. Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines. Higher-level synchronization is
// better done via channels and communication.
//
// Values containing the types defined in this package should not be copied.
package sync

import "sync/atomic"

// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
state int32
sema uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}

// state 的状态
const (
mutexLocked = 1 << iota // mutex is locked, b00001, 最低位
mutexWoken // b00010,是否被唤醒
mutexWaiterShift = iota // > b00010,用于计数
)

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 直接加锁成功,然后返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}

// 唤醒标志,这部分表示锁已经被占用
awoke := false
for {
old := m.state
// 表示当前无锁可用,对应最后一位
new := old | mutexLocked
if old&mutexLocked != 0 {
// 表示现在进入到等待状态,waiter + 1,通过 mutexWaiterShift 偏移实现,对应 state 前 29 位
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// 如果此轮被唤醒(是被 unlock 唤醒),那么就重置,为了避免惊群现象
new &^= mutexWoken
}
// 尝试获取锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// get it !
if old&mutexLocked == 0 {
break
}
// 等待被唤醒
runtime_Semacquire(&m.sema)
awoke = true
}
}
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}

old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}

not only one

由于只请求一次,无论什么原因,再次获取锁只有等待下一次信号量来才能重新被唤醒,因此我们想要多次进行请求,简单来说就是允许自旋锁,但是第一次优化里面就说了自旋锁会消耗大量 CPU 资源,因此需要对他进行限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//判断是否可以自旋,同时满足以下4个条件才能自旋:
//1、自旋次数小于4次
//2、cpu核数大于1
//3、GOMAXPROCS>1
//4、running P > 1 并且 P队列为空
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p; p.runqhead != p.runqtail {
return false
}
return true
}

核心代码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 //上面没有加锁成功,尝试在接下来的唤醒中去竞争锁
awoke := false //表示当前协程是不是被唤醒的
iter := 0 //记录当前自旋的次数
for {
old := m.state
new := old | mutexLocked // 设置锁标志位为1
if old&mutexLocked != 0 {
//判断是否满足自旋条件
if runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}

//内部调用procyield函数,该函数也是汇编语言实现。
//函数内部循环调用PAUSE指令。减少cpu的消耗,节省电量。
//指令的本质功能:让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多。
runtime_doSpin()
iter++
continue
}

new = old + 1<<mutexWaiterShift //锁没有释放,当前协程可能会阻塞在信号量上,先将waiter+1
}
··· //剩下的不变
}

饥饿问题

如果一个线程因为处理器时间全部被其他线程抢走而得不到处理器运行时间,这种状态被称之为饥饿,一般是由高优先级线程吞噬所有的低优先级线程的处理器时间引起的。

因此,在一步进行优化,将 state 划为四部分:
alt text

在超过一段时间之后就直接标记位 starving,因此 golang 目前的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package sync provides basic synchronization primitives such as mutual
// exclusion locks. Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines. Higher-level synchronization is
// better done via channels and communication.
//
// Values containing the types defined in this package should not be copied.
package sync

import (
"internal/race"
"sync/atomic"
"unsafe"
)

func throw(string) // provided by runtime

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}

const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota

// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
starvationThresholdNs = 1e6
)

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}

var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true)
}
}

RWMutex

如果说,我们什么都用一把大锁来锁,其实是非常消耗资源的,尤其是在读多写少的情况下,会阻塞其他协程,导致性能降低。并且其实只有在修改的时候才需要对数据进行上锁。为了提高并发性能,我们将读和写进行了区别,因此出现了读写锁。

什么是读写锁,读写锁分为两部分,一部分是读锁,一部分是写锁。

  • 读锁(RLock // RUnlock):可以多个协程持有,不是相互互斥的。
  • 写锁 (Lock // Unlock):只能独享,不能被读锁或者其他写锁共存。

在 Golang 中,RWlock 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If any goroutine calls [RWMutex.Lock] while the lock is already held by
// one or more readers, concurrent calls to [RWMutex.RLock] will block until
// the writer has acquired (and released) the lock, to ensure that
// the lock eventually becomes available to the writer.
// Note that this prohibits recursive read-locking.
// A [RWMutex.RLock] cannot be upgraded into a [RWMutex.Lock],
// nor can a [RWMutex.Lock] be downgraded into a [RWMutex.RLock].
//
// In the terminology of [the Go memory model],
// the n'th call to [RWMutex.Unlock] “synchronizes before” the m'th call to Lock
// for any n < m, just as for [Mutex].
// For any call to RLock, there exists an n such that
// the n'th call to Unlock “synchronizes before” that call to RLock,
// and the corresponding call to [RWMutex.RUnlock] “synchronizes before”
// the n+1'th call to Lock.
//
// [the Go memory model]: https://go.dev/ref/mem
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}

const rwmutexMaxReaders = 1 << 30

RWMutex 是基于 Mutex 的基础上的锁,除去 Mutex 之外,其他还有:

  • writerSem / readerSem:读写信号量
  • readerCount:读者数
  • readerWait: 记录阻塞等待的 reader 数量

另外注释里面有几个信息:

  • 支持多读/单写
  • 初始未上锁
  • 禁止复制锁
  • 写锁优先
  • 禁止读写锁升级或者降级
  • 遵循 Golang 的内存模型

Lock / Unlock

Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Happens-before relationships are indicated to the race detector via:
// - Unlock -> Lock: readerSem
// - Unlock -> RLock: readerSem
// - RUnlock -> Lock: writerSem
//
// The methods below temporarily disable handling of race synchronization
// events in order to provide the more precise model above to the race
// detector.
//
// For example, atomic.AddInt32 in RLock should not appear to provide
// acquire-release semantics, which would incorrectly synchronize racing
// readers, thus potentially missing races.

// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the [RWMutex] type.
func (rw *RWMutex) RLock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.Disable()
}
// 对应了 writer 的 lock
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

这两部分是相互对应的,写锁上锁的时候会减去一个大数 rwmutexMaxReaders,来通知 reader 这里需要上锁

1
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

然后读锁部分进行判断:

1
2
3
4
5
// 对应了 writer 的 lock
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}

这样来同一两部分,保证其同步。

Unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// RUnlock undoes a single [RWMutex.RLock] call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := rw.readerCount.Add(-1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
fatal("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if rw.readerWait.Add(-1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked [RWMutex] is not associated with a particular
// goroutine. One goroutine may [RWMutex.RLock] ([RWMutex.Lock]) a RWMutex and then
// arrange for another goroutine to [RWMutex.RUnlock] ([RWMutex.Unlock]) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}

// Announce to readers there is no active writer.
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
  • Unlock:写锁释放,恢复读者入口并唤醒所有等待的读者。
  • RUnlock:读锁释放,如果有写者在等待且自己是最后一个读者,则进入慢路径。
  • rUnlockSlow:处理写者等待时最后一个读者释放的情况,负责唤醒写者。

参考文章

Mutex

RWMutex