源码版本 Go1.13.15

Go 语言的 sync 包提供了用于同步的基本原语,sync.Mutex 就是其中用的最多的一个。

sync.Mutex 是 Go 语言里面的一个排他锁,当某一个 G 拥有了锁的所有权之后,其他请求锁的 G 就会阻塞,直到锁被释放。

锁的使用方法非常简单:

1
2
3
4
5
6
func main() {
	var mu sync.Mutex
	mu.Lock()
	defer mu.Unlock()
	// ... do others thing
}

那我们今天就来剖析下 sync.Mutes 底层是怎么实现的,看看那些还未获得锁的 G 阻塞时发生了什么。

基本结构

sync.Mutex 由两个字段 state 和 sema 组成。其中 state 表示当前互斥锁的状态;而 sema 是用于控制锁状态的信号量,实现 G 的阻塞和唤醒。

1
2
3
4
type Mutex struct {
	state int32
	sema  uint32
}

上面这个只占 8 字节的结构体就是 Go 语言里面的排他锁。它实现了接口:

1
2
3
4
type Locker interface {
	Lock()
	Unlock()
}

这两个方法我们后面会展开分析。

重点讲下互斥锁的状态,state 是个复合型字段,一个字段包含了多个含义,如下图所示:

state字段

在默认情况下,state 字段所有位都是 0。但是不同的 bit 位有不同的含义:

  • mutexLocked 位表示互斥锁的锁定状态,1 表示已加锁,即某一个 G 获取到锁;0 表示未加锁。
  • mutexWoken 位表示锁是否已唤醒,也就是某个唤醒的 G 正在尝试获取锁,1 表示已唤醒,0-表示未唤醒。
  • mutexStarving 表示当前互斥锁的饥饿状态,1 表示饥饿状态,0 表示正常状态。

其他位用来表示当前等待获取锁的 G 的数量,这样一算可以表示 2^29 个 G 等待获取锁。

正常模式和饥饿模式

sync.Mutex 有两种模式:正常模式和饥饿模式。

正常模式下,所有等待锁的 G 的按照 FIFO 顺序获取锁,被唤醒的 G 不会直接拥有锁,而是会与新请求的 G 共同竞争锁,但是因为新请求的 G 占用着 CPU,所以新请求的 G 有更大优势获取到锁,这样刚被唤醒的 G 大概率会竞争失败,获取不到锁。这种情况下,这个刚被唤醒的 G 会加入到等待队列的最前面。如果一个等待的 G 超过 1ms 没有获取到锁,那么它会将锁转变为饥饿模式。

饥饿模式下,拥有锁的 G 在释放锁的时候,会将锁直接交给等待队列最前面的 G。新来的 G 不会尝试获取锁,也不会自旋,而是放在等待队列的尾部。如果一个 G 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

与饥饿模式相比,正常模式下互斥锁拥有更好的性能,引入饥饿模式是保证锁的公平性,能避免部分 G 陷入等待无法获取到锁而造成高尾延时,就是我们说的『饿死』。

加锁和解锁

这一节我们就来分析下加锁和解锁的过程,加锁和解锁分别分别调用 sync.Lock() 和 sync.Unlock() 方法,在看源码时,我们要从多线程(goroutine)的并发场景去分析。

lock()

我们先来分析下加锁过程:

1
2
3
4
5
6
7
8
9
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return    // 尝试获取锁成功
	}
	// Slow path
	// 如果互斥锁的状态不是 0 就会进入 slowPath
	m.lockSlow()
}

调用 sync.Lock() 时分为两个部分,Fast path 和 Slow path,Fast path 是一条幸运之路,如果此时锁的状态是 0,通过一个 CAS 操作就能获取到锁,直接就返回了;如果走 Slow path,就会调用 m.lockSlow(),进行缓慢加锁过程。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
func (m *Mutex) lockSlow() {
	// 标记本 goroutine 的等待时间
	var waitStartTime int64
	starving := false     // 本 goroutine 是否已经处于饥饿状态
	awoke := false   // 本 goroutine 是否已唤醒
	iter := 0    	// 自旋次数
	old := m.state   // 赋值锁的当前状态
	for {
		// 第一个条件是,锁已经被锁了但不是饥饿状态;如果是饥饿状态,自旋是没有用的,锁的拥有权会直接交给等待队列的第一个 goroutine
		// 第二个条件是还可以自旋,多核、压力不大,在一定次数内可以自旋,具体条件参考 sync_runtime_canSpin 的实现
		// 如果满足这两个状态,不断自旋等待锁被释放、或者进入饥饿状态、或者不能再自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {

			// 在自旋的过程中,如果发现 state 还没有设置 woken 标识,则设置它的 woken 标识,并标记自己被唤醒
			// 设置 mutexWoken==1,告诉 unlock 操作,存在自旋 goroutine,unlock 操作之后不需要唤醒其他 goroutine
			// old&mutexWoken == 0 表示没有其他goroutine尝试唤醒锁
			// old>>mutexWaiterShift != 0 表示有等待的goroutine

			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				// 本 goroutine 已经被唤醒,正尝试唤醒锁
				awoke = true
			}
			runtime_doSpin()  // 执行自旋操作
			iter++
			old = m.state    // 自旋之后重新获取锁的状态
			continue
		}

		// 到了这一步,state 可能的状态:
		// 1.锁没有被释放,锁处于正常模式;
		// 2.锁没有被释放,锁处于饥饿模式;
		// 3.锁被释放,锁处于正常模式;
		// 4.锁被释放,锁处于饥饿模式;
		//
		// 并且本goroutine的 awoke 可能为 true 或者 false

		// new 复制 state 的当前状态,用来设置新的状态;old 是锁的当前状态
		new := old
		// 如果 old state 不是饥饿状态,new state 设置锁,尝试通过 cas 操作获取锁
		// 如果 old state 是饥饿状态,则不设置 new state 的锁,因为饥饿状态下锁直接转给等待队列的第一个
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		// 如果当前锁处于锁定状态或者饥饿状态,则将等待队列的等待者数量加一
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// 如果当前 goroutine 处于饥饿状态且 old state 已被加锁,将 new state 的状态标记为饥饿状态,即将锁标记为饥饿状态
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			// 当前 goroutine 自旋过之后,则需要将 mutexWoken 重置,因为当前 goroutine 的状态一定会是阻塞或者是已获得锁
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			// 清除 new state 的唤醒标记
			new &^= mutexWoken
		}
		// 当前 goroutine 获取到的 mutex state 是否被其他 goroutine 改变
		// 这里可能只设置锁,或者也有可能只设置为饥饿状态和等待数量
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 如果 old state 的状态是未被锁的状态,且锁不处于饥饿状态
			// 那么当前的 goroutine 已经获得了锁的拥有权,返回
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			// 设置/计算本goroutine的等待时间
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}

			// 既然未能获取到锁,就是用 sleep 原语阻塞本 goroutine  阻塞等待
			// 如果是新来的 goroutine, queueLifo=false,加入到等待队列的尾部,耐心等待
			// 如果是被唤醒的 goroutine,queueLifo=true,加入到等待队列的头部
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)

			// 唤醒之后检查锁是否应该处于饥饿状态

			// 计算当前 goroutine 是否处于饥饿状态
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state // 重新获取锁当前的状态
			// 如果当前的 state 处于饥饿状态,那么锁应该处于 unlock 状态,那么应该将锁直接交给本 goroutine,为什么?
			// 因为如果锁处于饥饿状态,上一个释放锁的 G 会主动唤醒队首的 goroutine。
			if old&mutexStarving != 0 {
				// 非法状态
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 当前 goroutine 用来设置锁,并将等待的 goroutine 数减 1.
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 如果本 goroutine 是最后一个等待者,或者它并不处于饥饿状态,
				// 那么我们需要把锁的 state 状态设置为正常模式.
				if !starving || old>>mutexWaiterShift == 1 {
					// 退出饥饿模式
					delta -= mutexStarving
				}
				// 设置新state, 因为已经获得了锁,退出、返回
				atomic.AddInt32(&m.state, delta)
				break
			}
			// 如果当前锁不是饥饿状态,就把当前的 goroutine 设置为被唤醒
			// 重置iter
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

lockSlow() 方法里面是一个大的 for 循环,有几个变量标识本次请求 goroutine 的状态,比如 starving 表示本 goroutine 是否处于饥饿状态;awoke 表示本 goroutine 是否已唤醒;waitStartTime 记录本 goroutine 的等待时间。

这里介绍下自旋的概念,

自旋是 CPU 空转一定的时钟周期。

自旋过程中,G 会保持对 CPU 的占用。在多核的 CPU 上,自旋可以避免 G 的切换,使用恰当会对性能带来很大的增益,但是使用不当就会拖慢整个程序,所以 G 进入自旋的条件非常苛刻:

  1. 正常模式下才能自旋;
  2. runtime_canSpin() 返回 true;

runtime_canSpin() 的判断如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func sync_runtime_canSpin(i int) bool {
    // 1.自旋次数不能超过 4 次;
    // 2.CPU 核数大于 1;
    // 3.有空闲的 P;
    // 4.当前 goroutine 所挂载的 P 下,本地待运行队列为空;
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

当满足上述 4 个条件时,sync_runtime_canSpin() 才会返回 true。可以看出自旋要求严格,毕竟在锁竞争激烈时,如果还无限制地自旋就肯定会影响其他 goroutine。

不同平台上自旋所用的指令不一样。例如在 amd64 平台下,汇编的实现如下:

1
2
3
4
5
6
7
8
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
    // 自旋 cycles 次,每次自旋执行 PAUSE 指令
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

每次自旋会执行 30 次 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间,相当于 30 个时钟周期。

unlock

1
2
3
4
5
6
7
8
func (m *Mutex) Unlock() {
	// Fast path:
	// 返回一个 state 被减后的值 new
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

与加锁类似,解锁也分为 Fast path 和 Slow path,使用 atomic.AddInt32() 加锁,如果返回的新状态为 0,则解锁成功;反之则调用 m.unlockSlow() 进入慢速解锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (m *Mutex) unlockSlow(new int32) {
	// 如果 state 不是处于锁的状态, 那么就是 Unlock 没有加锁的 mutex, panic
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	// 正常模式
	if new&mutexStarving == 0 {
		old := new
		for {
			// 如果已经没有等待获取锁的 goroutine
			// 或者锁被获取了(在 for 循环过程中,锁被其他 goroutine 获取了)
			// 或者锁是被唤醒状态,表示有 goroutine 被唤醒,不需要再去尝试唤醒其他 goroutine
			// 或者锁是饥饿状态,锁会直接交给队列头的 goroutine
			// 以上情况都直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}

			// 走到这一步说明,锁目前处于空闲状态、没有被唤醒的 goroutine、非饥饿模式且队列中有等待获取锁的 goroutine
			// 那么就需要将等待状态的 goroutine 数量减一 并设置为唤醒状态,然后唤醒一个等待状态的 goroutine
			// 这里为什么要设置woken标记??  避免被后到的 goroutine 竞争到锁
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 唤醒一个阻塞的 goroutine,但不一定是队列第一个
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else { // 饥饿模式
		// 让等待队列头部的第一个 goroutine 获得锁
		runtime_Semrelease(&m.sema, true, 1)
	}
}

相对于加锁,解锁过程比较简单,代码里有详细的注释,有不清楚的欢迎留言交流。

一些思考

现在我们回过头来想想几个问题,比如新请求的 G 为什么要进行自旋?加锁、解锁过程中为什么要设置 woken 标志位?

1.自旋目的?

自旋的目的是为了尽量减少阻塞和减少切换成本,想要减少切换成本那就不切换,不切换的话就自旋。假如上一个获得锁的 G 的临界区代码执行只需要十几个时钟周期时,让竞争者自旋等待一下,立刻就可以获得锁。减少不必要的切换成本,效率会更高。

所以为了减少切换成本,短暂的自旋等待是简单的方法。

2.加锁过程为什么要设置 woken 标记?

新请求的 G 申请锁时,发现锁被占用了。但自己满足自旋条件,于是自旋,并设置 woken 标记。此时拥有锁的的 G 在释放锁时,检查 woken 标记,如果被标记,哪怕等待获取锁的队列不为空,也不做唤醒,直接return,让自旋的 G 有更大机会抢到锁。也是为了降低切换成本。

1
2
3
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
	return
}

所以竞争者在自旋时,主动设置 woken 标记,这样拥有锁的 G 在释放锁时才能感知到。

3.解锁过程为什么要设置 woken 标记?

解锁过程中设置 woken 标记其实是为了尽可能地保证锁的公平性。假设这样的场景,阻塞队列里最后一个 G 被唤醒后,它还得等调度器运行到它,它自己再去抢锁。但在调度器运行到它之前,很可能有新来的 G 参与竞争锁,此时锁被抢走的概率就很大。这有失公平,被阻塞的 G 是先到者,新的 G 是后来者,应该尽量让它们一起竞争。

1
2
3
4
5
6
7
// 唤醒一个阻塞的 G,并设置 woken 标记
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
	// 唤醒一个阻塞的 goroutine,但不一定是队列第一个
	runtime_Semrelease(&m.sema, false, 1)
	return
}

设置 woken 标记后,state 就肯定不为零。此时新请求的 G,在执行 Lock() 的 fast-path 时会失败,接下来就只能乖乖排队了。

1
2
3
4
5
6
7
8
9
func (m *Mutex) Lock() {
	// Fast path
	// woken 标记设置后,这里的 CAS 操作就会失败
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
    // 参与锁的竞争
	m.lockSlow()
}

4.采用了哪些方式保证锁尽量公平?

  • 上面 3 提到的一点,在锁释放时,主动设置 woken 标记,防止新的竞争者轻易抢到锁;
  • 锁的竞争者进入阻塞队列策略不一样。新来的竞争者,抢不到锁,就排在队列尾部。先来的竞争者,从队列中被唤醒后,还是抢不到锁,就会放在队列头部;
  • 引入饥饿模式,锁的竞争者,被阻塞等待的时间超过指定阀值 - 1ms,锁就转为饥饿模式。这时锁的拥有者在释放锁时会唤醒它们,手递手式把锁交给它们。别的竞争者(包括新来的)都抢不到锁,直到把饥饿问题解决掉;

总结

sync.Mutex 的源码不多,加上注释不到 300 行,但是锁的实现方式和一些思路非常值得大家学习的,推荐大家阅读。大家可以多参考几篇文章阅读,每篇文章的侧重点都不一样,穿插着阅读可以帮助大家理解。

拓展阅读

同步原语与锁

这可能是最容易理解的 Go Mutex 源码剖析

你真的了解 sync.Mutex吗

sync.mutex 源代码分析

sync.mutex 详解

多图详解Go的互斥锁Mutex

源码剖析 golang 中 sync.Mutex

go中sync.Mutex源码解读

(全文完)

扫码关注领取学习资料!