Go36-31-sync.WaitGroup和sync.Once

栏目: Go · 发布时间: 6年前

内容简介:之前在协调多个goroutine的时候,使用了通道。基本都是按下面这样来使用的:这里有一个问题,要保证主goroutine最后从通道接收元素的的次数需要与之前其他goroutine发送元素的次数相同。其实,在这种应用场景下,可以选用另外一个同步工具,就是这里要讲的sync包的WaitGroup类型。

sync.WaitGroup

之前在协调多个goroutine的时候,使用了通道。基本都是按下面这样来使用的:

package main

import "fmt"

func main() {
    done := make(chan struct{})
    count := 5

    for i := 0; i < count; i++ {
        go func(i int) {
            defer func() {
                done <- struct{}{}
            }()
            fmt.Println(i)
        }(i)
    }

    for j := 0; j < count; j++ {
        <- done
    }
    fmt.Println("Over")
}

这里有一个问题,要保证主goroutine最后从通道接收元素的的次数需要与之前其他goroutine发送元素的次数相同。

其实,在这种应用场景下,可以选用另外一个同步工具,就是这里要讲的sync包的WaitGroup类型。

使用方法

sync.WaitGroup类型,它比通道更加适合实现这种一对多的goroutine协作流程。WaitGroup是开箱即用的,也是并发安全的。同时,与之前提到的同步 工具 一样,它一旦被真正的使用就不能被复制了。

WaitGroup拥有三个指针方法,可以想象该类型中有一个计数器,默认值是0,下面的方法就是操作或判断计数器:

Add(-1)

现在就用WaitGroup来改造开篇的程序:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup  // 开箱即用,所以直接声明就好了,没必要用短变量声明
    // wg := sync.WaitGroup{}  // 短变量声明可以这么写
    count := 5

    for i := 0; i < count; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            fmt.Println(i)
        }(i)
    }

    wg.Wait()
    fmt.Println("Over")
}

改造后,在主goroutine最后等待退出的部分现在看着要美观多了。这个就是WaitGroup典型的应用场景了。

注意的事项

计数器不能小于0在sync.WaitGroup类型值中计数器的值是不可以小于0的。一旦小于0会引发panic,不适当的调用Done方法和Add方法就有可能使它小于0而引发panic。

尽早增加计数器的值如果在对它的Add方法的首次调用,与对它的Wait方法的调用是同时发起的。比如,在同时启动的两个goroutine中,分别调用这两个方法,那就就有可能会让这里的Add方法抛出一个panic。并且这种情况不太容易,应该予以重视。所以虽然WaitGroup值本身并不需要初始化,但是尽早的增加其计数器的值是非要必要的。

复用的情况

WaitGroup的值是可以被复用的,但需要保证其 计数周期 的完整性。这里的计数周期指的是这样一个过程:该值中的计数器值由0变为了某个正整数,而后又经过一系列的变化,最终由某个正整数又变回了0。这个过程可以被视为一个计数周期。在一个此类的生命周期中,它可以经历任意多个计数周期。但是,只有在它走完当前的计数周期后,才能够开始下一个计数周期。

也就是说,如果一个此类值的Wait方法在它的某个计数周期中被调用,那么就会立即阻塞当前的goroutine,直至这个计数周期完成。在这种情况下,该值的下一个计数周期必须要等到这个Wait方法执行结束之后,才能够开始。

Wait方法是有一个执行的过程的,如果在这个方法执行期间,跨越了两个计数周期,就会引发一个panic。比如,当前的goroutine调用了Wait方法而阻塞了。另一个goroutine调用了Done方法使计数器变成了0。此时会唤醒之前阻塞的goroutine,并且去执行Wait方法中其余的代码(这里还在这行Wait方法,执行的是源码sync.Wait方法里的代码,不是我们自己写的程序的Wait之后的代码)。在这个时候,又有一个goroutine调用了Add方法,使计数器的值又从0变为了某个正整数。此时正在执行的Wait方法就会立即抛出一个panic。

小结

上面给了3种会引发panic的情况。关于后两种情况,建议如下:

不要把增加计数器值的操作和调用Wait方法的代码,放在不同的goroutine中执行。

就是要杜绝对同一个WatiGroup值的两种操作的并发执行。

后面提到的两种情况,不是每次都会发生,通常需要反复的实验才能够引发panic的情况。虽然不是每次都发生,但是在长期运行的过程中,这种情况是必然会出现的,应该予以重视并且避免。

如果对复现这些异常情况感兴趣,可以看一下sync代码包中的waitgroup_test.go文件。其中的名称以TestWaitGroupMisuse为前缀的测试函数,很好的展示了这些异常情况发生的条件。

sync.Once

与sync.WaitGroup类型一样,Sync.Once类型也属于结构体类型,同样也是开箱即用和并发安全的。由于这个类型中包含了一个sync.Mutex类型的字段,所以复制改类型的值也会导致功能失效。

使用方法

Do方法

Once类型的Do方法只接收一个参数,参数的类型必须是func(),即无参数无返回的函数。该方法的功能并不是对每一种参数函数都只执行一次,而是只执行首次被调用时传入的那个函数,并且之后不会再执行任何参数函数。所以,如果有多个需要执行一次的函数,应该为它们每一个都分配一个sync.Once类型的值。

基本用法如下:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var counter uint32
    var once sync.Once
    once.Do(func() {
        atomic.AddUint32(&counter, 1)
    })
    fmt.Println("counter:", counter)
    // 这次调用不会被执行
    once.Do(func() {
        atomic.AddUint32(&counter, 2)
    })
    fmt.Println("counter:", counter)
}
done字段

Once类型中还要一个名为done的uint32类型的字段。它的作用是记录所属值的Do方法被调用的次数。不过改字段的值只可能是0或1.一旦Do方法的首次调用完成,它的值就会从0变为1。

关于done的类型,其实用布尔类型就够了,这里只所以用uint32类型的原因是它的操作必须是原子操作,只能使用原子操作支持的数据类型。

Do方法的实现方式

Do方法在一开始就会通过atomic.LoadUint32来获取done字段的值,并且如果发现值为1就直接返回。这步只是初步保证了Do方法只会执行首次调用是传入的函数。

不过单凭上面的判断是不够的。如果两个goroutine都调用了同一个新的Once值的Do方法,并且几乎同时执行到了其中的这个条件判断代码,那么它们就都会因判断结果为false而继续执行Do方法中剩余的代码。

基于上面的可能,在初步保证的判断之后,Do方法会立即锁定其所属值中的那个sync.Mutex类型的m字段。然后,它会在临界区中再次检查done字段的值。此时done的值应该仍然是0,并且已经加锁。此时才认为是条件满足,才会去调用参数函数。并且用原子操作把done的值变为1。

单例模式

如果熟悉 设计模式 中的单例模式的话,这个Do方法的实现方式,与单例模式有很多相似之处。都会先在临界区之外判断一次关键条件,若条件不满足则立即返回。这通常被称为 快路径 ,或者叫做 快速失败路径

如果条件满足,那么到了临界区中还要再对关键条件进行一次判断,这主要是为了更加严谨。这两次条件判断常被统称为(跨临界区的) 双重检查 。由于进入临界区前要加锁,显然会降低代码的执行速度,所以其中的第二次条件判断,以及后续的操作就被称为 慢路径 或者 常规路径

Do方法中的代码不多,但它却应用了一个很经典的编程范式。

功能方面的特点

一、由于Do方法只会在参数函数执行结束之后把done字段的值变为1,因此,如果参数函数的执行需要很长的时间或者根本就不会结束,那么就有可能会导致相关goroutine的同时阻塞。

比如,有多个goroutine并发的调用了同一个Once值的Do方法,并且传入的函数都会一直执行而不结束。那么,这些goroutine就都会因调用了这个Do方法而阻塞。此时,那个抢先执行了参数函数的goroutine之外,其他的goroutine都会被阻塞在该Once值的互斥锁m的那行代码上。

效果演示的示例代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    once := sync.Once{}  // 这里换短变量声明
    wg := sync.WaitGroup{}

    wg.Add(1)
    go func() {
        defer wg.Done()
        // 这个函数会被执行
        once.Do(func() {
            for i := 0; i < 10; i++ {
                fmt.Printf("\r任务[1-%d]执行中...", i)
                time.Sleep(time.Millisecond * 400)
            }
        })
        fmt.Printf("\n任务[1]执行完毕\n")
    }()

    wg.Add(1)
    go func() {
        defer wg.Done() 
        time.Sleep(time.Millisecond * 300)
        // 这句Do方法的调用会一直阻塞,知道上面的函数执行完毕
        // 然后Do方法里的函数不会执行
        once.Do(func() {
            fmt.Println("任务[2]执行中...")
        })
        // 上面Do方法阻塞结束后,直接会执行下面的代码
        fmt.Println("任务[2]执行完毕")
    }()

    wg.Add(1)
    go func() {
        defer wg.Done() 
        time.Sleep(time.Millisecond * 300)
        once.Do(func() {
            fmt.Println("任务[3]执行中...")
        })
        fmt.Println("任务[3]执行完毕")
    }()

    wg.Wait()
    fmt.Println("Over")
}

二、Do方法在参数函数执行结束后,对done字段的赋值用的是原子操作,并且这一操作是被挂载defer语句中的。因此,不论参数函数的执行会以怎样的方式结束,done字段的值都会变为1。

这样就是说即时参数函数没有执行成功,比如引发了panic。也是无法使用同一个Once值重新执行别的函数了。所以,如果需要为参数函数的执行设定重试机制,就要考虑在适当的时候替换Once值。

参考下面的示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    once := sync.Once{}
    wg := sync.WaitGroup{}

    wg.Add(1)
    go func() {
        defer wg.Done()
        defer func() {
            if p := recover(); p != nil {
                fmt.Printf("PANIC: %v\n", p)
                // 下面的语句会给once变量替换一个新的Once值,这样下面的第二个任务还能被执行
                // once = sync.Once{}
            }
        }()
        once.Do(func() {
            fmt.Println("开始执行参数函数,紧接着会引发panic")
            panic(fmt.Errorf("主动引发了一个panic"))  // panic之后就去调用defer了
            fmt.Println("参数函数执行完毕")  // 这行不会执行,后面的都不会执行
        })
        fmt.Println("Do方法调用完毕")  // 这行也不会执行
    }()

    wg.Add(1)
    go func() {
        defer wg.Done() 
        time.Sleep(time.Millisecond * 500)
        once.Do(func() {
            fmt.Println("第二个任务执行中...")
            time.Sleep(time.Millisecond * 800)
            fmt.Println("第二个任务执行结束")
        })
        fmt.Println("第二个任务结束")
    }()

    wg.Wait()
    fmt.Println("Over")
}

延迟初始化

延迟一个昂贵的初始化步骤到有实际需求的时刻是一个很好的实践。这也是sync.Once的一个使用场景。

下面是从书上改的示例代码:

package main

import (
    "fmt"
    "sync"
)

var once sync.Once
var testmap map[string] int32

// 对testmap进行初始化的函数
func loadTestmap() {
    testmap = map[string] int32{
        "k1": 1,
        "k2": 2,
        "k3": 3,
    }
}

// 获取testmap对应key的值,如果没有初始化,会先执行初始化
// 书上说这个函数是并发安全的,这里的map初始化之后,内容不会再变
func getKey(key string) int32 {
    once.Do(loadTestmap)
    // 最后的return这句可能不是并发安全的,不过线程安全的map不是这里的重点
    // 假定这里的map在初始化之后只会被多个goroutine读取,其内容不会再改变
    return testmap[key]
}

func main() {
    fmt.Println(getKey("k1"))
}

这里不考虑map线程安全的问题,而且书上的例子这里的map只用来存放数据,初始化之后不会对其内容进行修改。

这里主要是保证在变量初始化过程中的并发安全。以这种方式来使用sync.Once,可以避免变量在正确构造之前就被其它goroutine分享。否则,在别的goroutine中可能会获取到一个内容不完整的变量。

总结

sync代码包的WaitGroup类型和Once类型都是非常易用的同步工具。它们都是开箱即用和并发安全的。

Once类型使用互斥锁和原子操作实现了功能,而WatiGroup类型中只用到了原子操作。所以可以说,它们都是更高层次的同步工具。它们都基于基本的同步工具,实现了某种特定的功能。sync包中的其他高级同步工具,其实也都是这样的。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Text Processing in Python

Text Processing in Python

David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99

Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具