sync.Mutex 是 Go 标准库中常用的一个排外锁。当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock
方法的调用上,直到锁被释放。
sync.Mutex
的实现也是经过多次的演化,功能逐步加强,增加了公平的处理和饥饿机制。
初版的 Mutex
首先我们来看看Russ Cox在2008提交的第一版的 Mutex
实现。
type Mutex struct { key int32; sema int32; } func xadd(val *int32, delta int32) (new int32) { for { v := *val; if cas(val, v, v+delta) { return v+delta; } } panic("unreached") } func (m *Mutex) Lock() { if xadd(&m.key,1) ==1 { // changed from 0 to 1; we hold lock return; } sys.semacquire(&m.sema); } func (m *Mutex) Unlock() { if xadd(&m.key,-1) ==0 { // changed from 1 to 0; no contention return; } sys.semrelease(&m.sema); }
可以看到,这简单几行就可以实现一个排外锁。通过 cas
对 key
进行加一, 如果 key
的值是从 0
加到 1
, 则直接获得了锁。否则通过 semacquire
进行sleep, 被唤醒的时候就获得了锁。
2012年, commit dd2074c8 做了一次大的改动,它将 waiter count
(等待者的数量)和 锁标识
分开来(内部实现还是合用使用 state
字段)。新来的 goroutine 占优势,会有更大的机会获取锁。
获取锁, 指当前的gotoutine拥有锁的所有权,其它goroutine只有等待。
2015年, commit edcad863 , Go 1.5中 mutex
实现为全协作式的,增加了spin机制,一旦有竞争,当前goroutine就会进入调度器。在临界区执行很短的情况下可能不是最好的解决方案。
2016年 commit 0556e262 , Go 1.9中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在1毫秒,并且修复了一个大bug,唤醒的goroutine总是放在等待队列的尾部会导致更加不公平的等待时间。
目前这个版本的 mutex
实现是相当的复杂, 如果你粗略的瞄一眼,很难理解其中的逻辑, 尤其实现中字段的共用,标识的位操作,sync函数的调用、正常模式和饥饿模式的改变等。
本文尝试解析当前 sync.Mutex
的实现,梳理一下 Lock
和 Unlock
的实现。
源代码分析
根据 Mutex
的注释,当前的 Mutex
有如下的性质。这些注释将极大的帮助我们理解 Mutex
的实现。
互斥锁有两种状态:正常状态和饥饿状态。
在正常状态下,所有等待锁的goroutine按照 FIFO 顺序等待。唤醒的goroutine不会直接拥有锁,而是会和新请求锁的goroutine竞争锁的拥有。新请求锁的goroutine具有优势:它正在CPU上执行,而且可能有好几个,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。 如果一个等待的goroutine超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。
在饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态, 也不会去尝试自旋操作,而是放在等待队列的尾部。
如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。
正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。
在分析源代码之前, 我们要从多线程(goroutine)的并发场景去理解为什么实现中有很多的分支。
当一个goroutine获取这个锁的时候, 有可能这个锁根本没有竞争者, 那么这个goroutine轻轻松松获取了这个锁。
而如果这个锁已经被别的goroutine拥有, 就需要考虑怎么处理当前的期望获取锁的goroutine。
同时, 当并发goroutine很多的时候,有可能会有多个竞争者, 而且还会有通过信号量唤醒的等待者。
sync.Mutex
只包含两个字段:
type Mutex struct { state int32 sema uint32 }
state
是一个共用的字段, 第0个 bit 标记这个 mutex
是否已被某个goroutine所拥有, 下面为了描述方便称之为 state
已加锁,或者 mutex
已加锁。 如果 第0个 bit为0, 下文称之为 state
未被锁, 此mutex目前没有被某个goroutine所拥有。
第1个bit 标记这个 mutex
是否已唤醒, 也就是有某个唤醒的 goroutine
要尝试获取锁。
第2个bit 标记这个 mutex
状态, 值为1表明此锁已处于饥饿状态。
同时,尝试获取锁的goroutine也有状态,有可能它是新来的goroutine,也有可能是被唤醒的goroutine, 可能是处于正常状态的goroutine, 也有可能是处于饥饿状态的goroutine。
Lock
func (m *Mutex) Lock() { // 如果mutext的state没有被锁,也没有等待/唤醒的goroutine, 锁处于正常状态,那么获得锁,返回. // 比如锁第一次被goroutine请求时,就是这种状态。或者锁处于空闲的时候,也是这种状态。 if atomic.CompareAndSwapInt32(&m.state,0, mutexLocked) { return } // 标记本goroutine的等待时间 var waitStartTime int64 // 本goroutine是否已经处于饥饿状态 starving := false // 本goroutine是否已唤醒 awoke := false // 自旋次数 iter :=0 // 复制锁的当前状态 old := m.state for { // 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。 // 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考`sync_runtime_canSpin`的实现。 // 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。 if !awoke && old&mutexWoken ==0 && old>>mutexWaiterShift !=0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } // 到了这一步, state的状态可能是: // 1. 锁还没有被释放,锁处于正常状态 // 2. 锁还没有被释放, 锁处于饥饿状态 // 3. 锁已经被释放, 锁处于正常状态 // 4. 锁已经被释放, 锁处于饥饿状态 // // 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识) // new 复制 state的当前状态, 用来设置新的状态 // old 是锁当前的状态 new := old // 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁, // 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个. if old&mutexStarving ==0 { new |= mutexLocked } // 将等待队列的等待者的数量加1 if old&(mutexLocked|mutexStarving) !=0 { new +=1 << mutexWaiterShift } // 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁, // 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态. if starving && old&mutexLocked !=0 { new |= mutexStarving } // 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠, // 总之state的新状态不再是woken状态. if awoke { if new&mutexWoken ==0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // 通过CAS设置new state值. // 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态. if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果old state的状态是未被锁状态,并且锁不处于饥饿状态, // 那么当前goroutine已经获取了锁的拥有权,返回 if old&(mutexLocked|mutexStarving) ==0 { break } // 设置/计算本goroutine的等待时间 queueLifo := waitStartTime !=0 if waitStartTime ==0 { waitStartTime = runtime_nanotime() } // 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine // 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待 // 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部 runtime_SemacquireMutex(&m.sema, queueLifo) // sleep之后,此goroutine被唤醒 // 计算当前goroutine是否已经处于饥饿状态. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 得到当前的锁状态 old = m.state // 如果当前的state已经是饥饿状态 // 那么锁应该处于Unlock状态,那么应该是锁被直接交给了本goroutine if old&mutexStarving !=0 { // 如果当前的state已被锁,或者已标记为唤醒, 或者等待的队列中不为空, // 那么state是一个非法状态 if old&(mutexLocked|mutexWoken) !=0 || old>>mutexWaiterShift ==0 { throw("sync: inconsistent mutex state") } // 当前goroutine用来设置锁,并将等待的goroutine数减1. delta := int32(mutexLocked -1<<mutexWaiterShift) // 如果本goroutine是最后一个等待者,或者它并不处于饥饿状态, // 那么我们需要把锁的state状态设置为正常模式. if !starving || old>>mutexWaiterShift ==1 { // 退出饥饿模式 delta -= mutexStarving } // 设置新state, 因为已经获得了锁,退出、返回 atomic.AddInt32(&m.state, delta) break } // 如果当前的锁是正常模式,本goroutine被唤醒,自旋次数清零,从for循环开始处重新开始 awoke = true iter =0 } else { // 如果CAS不成功,重新获取锁的state, 从for循环开始处重新开始 old = m.state } } }
Unlock
func (m *Mutex) Unlock() { // 如果state不是处于锁的状态, 那么就是Unlock根本没有加锁的mutex, panic new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked ==0 { throw("sync: unlock of unlocked mutex") } // 释放了锁,还得需要通知其它等待者 // 锁如果处于饥饿状态,直接交给等待队列的第一个, 唤醒它,让它去获取锁 // 锁如果处于正常状态, // new state如果是正常状态 if new&mutexStarving ==0 { old := new for { // 如果没有等待的goroutine, 或者锁不处于空闲的状态,直接返回. if old>>mutexWaiterShift ==0 || old&(mutexLocked|mutexWoken|mutexStarving) !=0 { return } // 将等待的goroutine数减一,并设置woken标识 new = (old -1<<mutexWaiterShift) | mutexWoken // 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁. if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个. // 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。 // 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态, // 新来的goroutine不会把锁抢过去. runtime_Semrelease(&m.sema, true) } }
问个问题
如果一个goroutine g1
通过 Lock
获取了锁, 在持有锁的期间, 另外一个goroutine g2
调用 Unlock
释放这个锁, 会出现什么现象?
-
g2
调用Unlock
panic -
g2
调用Unlock
成功,但是如果将来g1
调用Unlock
会 panic -
g2
调用Unlock
成功,如果将来g1
调用Unlock
也成功
运行下面的例子试试:
package main import ( "sync" "time" ) func main() { var mu sync.Mutex go func() { mu.Lock() time.Sleep(10 * time.Second) mu.Unlock() }() time.Sleep(time.Second) mu.Unlock() select {} }
以上所述就是小编给大家介绍的《sync.mutex 源代码分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Kakfa utils源代码分析
- 62. ImageLoader源代码-流程分析
- 通过源代码分析Mybatis的功能
- Envoy(四):envoy源代码走读&启动过程分析
- vue-router 源代码全流程分析「长文」
- Fabric 1.0源代码分析(2) blockfile(区块文件存储)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)
左程云 / 电子工业出版社 / 109.00元
《程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)》是一本程序员代码面试"神书”!书中对IT名企代码面试各类题目的最优解进行了总结,并提供了相关代码实现。针对当前程序员面试缺乏权威题目汇总这一痛点,本书选取将近300道真实出现过的经典代码面试题,帮助广大程序员的面试准备做到接近万无一失。"刷”完本书后,你就是"题王”!《程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)》......一起来看看 《程序员代码面试指南:IT名企算法与数据结构题目最优解(第2版)》 这本书的介绍吧!