内容简介:Mutex只有两个阻塞方法Lock和Unlock,有两种工作模式:normal和starvation。goroutine在抢占锁时,会先自旋一段时间,如果抢占失败会被放在一个FIFO等待队列中休眠,当锁被释放时,会优先唤醒队首的goroutine。在normal模式中,被唤醒的waiter会跟新到的goroutine竞争锁,一般来说新的goroutine已经在cpu中进行了,并且可能有不止一个goroutine,在这种情况下刚被唤醒的goroutine有很大概率会失败,而被重新放回FIFO队首。如果当一个
【golang源码分析】sync.Mutex
概述
Mutex只有两个阻塞方法Lock和Unlock,有两种工作模式:normal和starvation。
goroutine在抢占锁时,会先自旋一段时间,如果抢占失败会被放在一个FIFO等待队列中休眠,当锁被释放时,会优先唤醒队首的goroutine。
在normal模式中,被唤醒的waiter会跟新到的goroutine竞争锁,一般来说新的goroutine已经在cpu中进行了,并且可能有不止一个goroutine,在这种情况下刚被唤醒的goroutine有很大概率会失败,而被重新放回FIFO队首。如果当一个waiter等待获取锁的时间超过1ms,会将mutex转换成starvation模式。
在starvation模式,当mutex被unlock时,持有权会直接移交到队首的goroutine中。新加入的goroutine不会再参与锁的竞争,而是直接放入等待队列的尾部。
mutex通过一个state变量来维护锁的状态信息。
- 低一位mutexLocked:表示锁是否被锁定
- 低二位mutexWoken:表示是否有goroutine在竞争锁,这个主要应用于unlock时判断是否有必要唤醒等待队列中的goroutine
- 低三位mutexStarving:表示锁的模式starvation或normal,剩余高位用于标志等待队列的长度。
代码分析
定义
Mutex实现了Locker接口,维护两个变量——state用来维护锁的状态,同时通过操作sema来唤醒和休眠等待goroutine
type Mutex struct { state int32 sema uint32 } // A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() } const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 )
Lock
完整代码
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
详解
- 首先默认state为0,通过cas操作将锁的状态更新为mutexLocked,尝试快速获取锁。在并发度较小的情况下,mutex多数处于初始状态,可以提供加锁性能。如果快速获取锁失败,goroutine会不断地进行自旋抢锁、休眠、唤醒抢锁,直到抢到锁后break。在这期间,需要通过cas对state进行更新,只有更新成功的goroutine才能进行下一阶段的操作。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return }
- 竞争锁的goroutine会先自旋一段时间直到mutex被unlocked、或切换到starvation模式、或自旋次数达到上限。自旋过程中,goroutine也会尝试通过cas将mutexWoken位置1,以通知Unlock操作不必唤醒等待队列中的goroutine。starvatione模式是不需要自旋的,因为锁的持有权会直接移交给等待队列队首的goroutine,自旋操作没有实际意义。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 当前goroutine未更新成功过woken位,woken位为0,等待队列非空,通过cas尝试更新woken位 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() // pause一段时间 iter++ old = m.state continue }
-
从前面代码可以看到,退出自旋有三种原因,及其对应的处理如下:
- mutex被Unlock:将mutexLocked置1,代表尝试抢锁,然后cas尝试更新state
- mutex切换到starvation模式:不改变mutexLocked,waiter数量+1,cas更新state成功后后直接加入到等待队列里。
- 自旋次数达到上限:不改变mutexLocked,waiter数量+1,如果starving变量为true,则将mutexStarving位置1。
starving是goroutine被唤醒时通过计算等待时间获取的,大于1ms则为true,表示需要切换到starvation模式。
这里需要注意,当mutex为unlocked的时候,不需要转换mutex模式。因为starving为true,说明该goroutine是normal模式下等待队列中刚被唤醒的waiter(原因需要看后面的代码,在starvation模式。会直接将mutex的持有权交给唤醒的goroutine,走不到这一步)。如果此时做了状态切换,可能会出现在starvation模式,等待队列为空的情况。因此, 只有自旋抢锁失败的情况下,才会由等待时间超过阈值的goroutine将mutex模式切换到starvation 。
new := old // 锁被unlocked,将mutexLocked置1,代表尝试抢锁 if old&mutexStarving == 0 { new |= mutexLocked } // starving或自旋次数到上限, 等待数量+1 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 如果已经是starvation状态,这一步没什么意义 if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 把awoke位给清掉 new &^= mutexWoken }
-
cas 更新mutex状态,失败则重复上述操作,直到状态更新成功。
- 如果是获取锁的动作,则直接break。
- 否则将goroutine放到等待队列中,若waitStartTime不为0,则说明是waiter抢锁失败,应该重新返回队首。
if atomic.CompareAndSwapInt32(&m.state, old, new) { // 获取锁成功,break。接下来要处理starving和自旋次数达到上限的情况 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // waitStartTime说明是被唤醒的goroutine抢锁失败了,应该重新放回队首(queueLifo=true) queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 休眠 runtime_SemacquireMutex(&m.sema, queueLifo)
- 被唤醒后,如果当前starving为true,或等待时间大于阈值,则将starving置为true,下一次自旋抢锁失败会将mutex切换到starvation模式。在starvation模式,该goroutine直接获取锁,否则的话要重复前面的操作跟新来的goroutine抢锁
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // starving模式不应该出现mutex被locked或woken,等待队列长度也不能为0,有的话说明这段代码逻辑有bug if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 获取锁并将等待数-1 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 等待时间小于阈值,或该goroutine为等待队列中最后一个waiter,切换到normal模式 if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } // 把状态更新过去,获取锁并退出。这里不用cas,因为前面操作保证了低三位都不会被更新,高位本来是原子计数,所以直接add就可以 atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0
Unlock
// mutex允许一个goroutine加锁,另一个goroutine解锁,不要求两个操作保证同一个goroutine func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 直接解锁,然后判断一下状态,如果不一致的话则抛异常 new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // starvation模式,则直接唤醒blocked队列队首goroutine // normal模式,如果等待队列为空,或此时锁的状态发生了变化,则不用唤醒等待队列中的goroutine // 否则的话,将等待数减1,cas更新state,成功则唤醒一个waiter if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // starving模式,直接将持有权交给下一个waiter。mutexLocked是在队首waiter被唤醒后更新的,所以此时mutexLocked还是0,但是在starving // 模式下,新来的goroutine不会更新mutexLocked位。配合Lock代码看。 runtime_Semrelease(&m.sema, true) } }
\
practice/golang/源码分析
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- 【源码阅读】Gson源码阅读
- 如何阅读Java源码 ,阅读java的真实体会
- 我的源码阅读之路:redux源码剖析
- JDK源码阅读(六):HashMap源码分析
- 如何阅读源码?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。