内容简介:互斥锁对于日常使用来说非常简单, 但是
互斥锁对于日常使用来说非常简单, 但是 sync.Mutex
里的状态变更, 并发控制, 原子操作, 循环体等表示很复杂, 让我探究一下里面是什么葫芦药呢!
Lock
mutex.Lock()
里的流程很简单, 只是判断 m.state
能不能用 atomic.CompareAndSwapInt32
上锁, 可以就直接退出, 不能则执行 lockSlow()
函数, 如下图:
lockSlow()
是个既复杂又重要的函数, 只要不是即时能获取锁的都会到这里来.
在开始时先初始化几个变量: waitStartTime
waitStartTime int64 // 开始等待时间(纳秒), 用于判断是新来的g还是唤醒的g, 还用于判断能不能切换饥饿模式. starving := false // 当前是否饥饿 awoke := false // 当前是否已唤醒 iter := 0 // 自旋次数 old := m.state // 最近一次获取的状态 复制代码
接下来进入循环体, 逻辑复杂只能拆分来分析:
上码:
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { ...(同步局部变量唤醒) runtime_doSpin() iter++ old = m.state continue } 复制代码
这段代码用来执行自旋, 不过执行前要先判断能不能自旋, 条件比较苛刻: 当前是正常模式且已锁定 ( old&(mutexLocked|mutexStarving) == mutexLocked
) 自旋次数小于5次 ( runtime_canSpin(iter)
) cpu核数大于1个 (同上) P大于1 (同上) 有一个正在运行的P并且runq为空. (同上) 执行自旋并且更新最近状态, 直到不允许自旋.
new := old if old&mutexStarving == 0 { // 非饥饿模式下才能加锁 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // 已加锁或者在饥饿模式下, 累计加上一个等待的g new += 1 << mutexWaiterShift // 十进制: new += 8 (第四位开始就是等待数量) } if starving && old&mutexLocked != 0 { // 准许切换饥饿模式并且已锁定 new |= mutexStarving // 设置饥饿模式 } if awoke { if new&mutexWoken == 0 { // 唤醒状态不一致 throw("sync: inconsistent mutex state") } new &^= mutexWoken // mutexWoken位由 1 => 0 // 重置唤醒状态 } 复制代码
new
为即将要改变状态的变量, 对下的4个判断用来对 new
的计算. 涉及到承上启下及并发逻辑, 第一次看应该比较混乱.
if atomic.CompareAndSwapInt32(&m.state, old, new) { // ...下面代码 } else{ old = m.state } 复制代码
如果对比交换值 m.state
失败, 则代表 m.state
被其它修改, 只能赋上新的状态并重新循环一次. 如果成功则进入以下代码:
if old&(mutexLocked|mutexStarving) == 0 { // 此处表示饥饿模式下不会获取锁 break // 已利用CAS获取锁 } // waitStartTime 开始等待时间 // queueLifo 是否后入先出, 唤醒的g后入先出, 新来的g则排在队列后面 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 运行时信号量互斥 // 等待唤醒 复制代码
waitStartTime
不等于0表示唤醒的g, 否则表示新来的g runtime_SemacquireMutex()
进入内部信号量互斥(不开放), 实际上跟 channel
的阻塞原理是一样的, 都是通过 goparkunlock
实现. (具体看 runtime.sync_runtime_SemacquireMutex()
)
// 如果大于1毫秒, 准可切换饥饿模式 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 获取最新状态, 睡眠前与唤醒后的状态有可能不一致 if old&mutexStarving != 0 { // 如果已经是饥饿模式 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { // 状态不一致 throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) // -7: 减小一个等待者并设置为锁定状态 if !starving || old>>mutexWaiterShift == 1 { // 如果当前g不准可饥饿模式且只有一个等待者 delta -= mutexStarving // -11: 在delta上再退出饥饿模式 } atomic.AddInt32(&m.state, delta) break // 退出循环(即当前g已获取锁) } awoke = true // 代表当前已是唤醒后的g iter = 0 // 重置自旋次数 复制代码
中间的判断只要是进入饥饿模式都能获取锁, 新来的g永远排在后面.
Unlock
Lock()
与
Unlock()
都容易阅读理解,
new
表示为有多个等待者.
unlockSlow()
也相对比
lockSlow()
简单多了.
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 1) } } 复制代码
非饥饿模式下: 如果没有等待者, 或者 m.state
带有状态(新g抢到锁), 直接返回. 否则唤醒一个g继续执行.
处于饥饿模式下: rumtime_Semrelease
的 handoff
为真, 表示需要阻塞其他的g, 并以优先级执行等待队列的g.
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- HDFS 源码解读:HadoopRPC 实现细节的探究
- 跨域不完全探究
- Spring源码探究:容器
- Flutter BuildContext 探究
- Flutter mixins 探究
- Serverless 一些探究(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。