Golang的sync.WaitGroup 实现逻辑和源码解析

栏目: IT技术 · 发布时间: 4年前

内容简介:方便的并发,是Golang的一大特色优势,而使用并发,对sync包的WaitGroup不会陌生。WaitGroup主要用来做Golang并发实例即Goroutine的等待,当使用go启动多个并发程序,通过waitgroup可以等待所有go程序结束后再执行后面的代码逻辑,比如:WaitGroup对外提供三个方法,Add(int),Done()和Wait(), 其中Done()是调用了Add(-1),一般使用方法是,先统一Add,在goroutine里并发的Done,然后Wait。WaitGroup主要维护了2

方便的并发,是Golang的一大特色优势,而使用并发,对sync包的WaitGroup不会陌生。WaitGroup主要用来做Golang并发实例即Goroutine的等待,当使用 go 启动多个并发程序,通过waitgroup可以等待所有go程序结束后再执行后面的代码逻辑,比如:

func Main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(10 * time.Second)
        }()

    }
    wg.Wait() // 等待在此,等所有go func里都执行了Done()才会退出
}
复制代码

实现原理

WaitGroup对外提供三个方法,Add(int),Done()和Wait(), 其中Done()是调用了Add(-1),一般使用方法是,先统一Add,在goroutine里并发的Done,然后Wait。

WaitGroup主要维护了2个计数器,一个是请求计数器 v,一个是等待计数器 w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit。

那么等待计数器拿来干嘛?是因为同一个实例的Wait()方法支持多处调用,每一次Wait()方法执行,等待计数器 w 就会加1,而当请求计数器v为0触发Wait()时,要根据w的数量发送w份的信号量,正确的触发所有的Wait(),这虽然不是常用的一个特性,但是在一些特殊场合是有用处的(比如多个并发都依赖于WaitGroup的实例的结束信号来进行下一个action),演示代码如下:

func main() {
  wg := sync.WaitGroup{}
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
​    }()
  }
  time.Sleep(2 * time.Second)
  for j := 0; j < 3; j++ {
    go func(i int) {
      // 3个地方调用Wait(),通过等待j计时器,每个Wati都会被hu唤醒
      wg.Wait()
      fmt.Println("wait done now ", i)
    }(j)
  }
  time.Sleep(10 * time.Second)
  return
}
/*
输出如下,数字出现的顺序随机
wait done now  1
wait done now  0
wait done now  2
*/
复制代码

同时,WaitGroup里还对使用逻辑进行了严格的检查,比如Wait()一旦开始不能Add().

下面是带注释的代码,去掉了不影响代码逻辑的trace部分:

func (wg *WaitGroup) Add(delta int) {
    statep := wg.state()
    // 更新statep,statep将在wait和add中通过原子操作一起使用
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
        if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        // wait不等于0说明已经执行了Wait,此时不容许Add
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 正常情况,Add会让v增加,Done会让v减少,如果没有全部Done掉,此处v总是会大于0的,直到v为0才往下走
    // 而w代表是有多少个goruntine在等待done的信号,wait中通过compareAndSwap对这个w进行加1
     if v > 0 || w == 0 {
        return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    // 当v为0(Done掉了所有)或者w不为0(已经开始等待)才会到这里,但是在这个过程中又有一次Add,导致statep变化,panic
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    // 将statep清0,在Wait中通过这个值来保护信号量发出后还对这个Waitgroup进行操作
    *statep = 0
    // 将信号量发出,触发wait结束
    for ; w != 0; w-- {
        runtime_Semrelease(&wg.sema, false)
    }
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    statep := wg.state()
        for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // Increment waiters count.
        // 如果statep和state相等,则增加等待计数,同时进入if等待信号量
        // 此处做CAS,主要是防止多个goroutine里进行Wait()操作,每有一个goroutine进行了wait,等待计数就加1
        // 如果这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(&wg.sema))
            }
            // 等待信号量
            runtime_Semacquire(&wg.sema)
            // 信号量来了,代表所有Add都已经Done
            if *statep != 0 {
                // 走到这里,说明在所有Add都已经Done后,触发信号量后,又被执行了Add
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}
复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

重构

重构

[美]马丁•福勒(Martin Fowler) / 熊节 / 人民邮电出版社 / 2015-8 / 69.00

本书清晰揭示了重构的过程,解释了重构的原理和最佳实践方式,并给出了何时以及何地应该开始挖掘代码以求改善。书中给出了70 多个可行的重构,每个重构都介绍了一种经过验证的代码变换手法的动机和技术。本书提出的重构准则将帮助你一次一小步地修改你的代码,从而减少了开发过程中的风险。一起来看看 《重构》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具