Golang - 调度剖析【第三部分】

栏目: Go · 发布时间: 6年前

内容简介:首先,在我平时遇到问题的时候,特别是如果它是一个新问题,我一开始并不会考虑使用并发的设计去解决它。我会先实现顺序执行的逻辑,并确保它能正常工作。然后在可读性和技术关键点都 Review 之后,我才会开始思考并发执行的实用性和可行性。有的时候,并发执行是一个很好的选择,有时则不一定。在本系列的第一部分中,我解释了并发意味着

本篇是调度剖析的第三部分,将重点关注 并发 特性。

回顾:

第一部分

第二部分

简介

首先,在我平时遇到问题的时候,特别是如果它是一个新问题,我一开始并不会考虑使用并发的设计去解决它。我会先实现顺序执行的逻辑,并确保它能正常工作。然后在可读性和技术关键点都 Review 之后,我才会开始思考并发执行的实用性和可行性。有的时候,并发执行是一个很好的选择,有时则不一定。

在本系列的第一部分中,我解释了 系统调度 的机制和语义,如果你打算编写多线程代码,我认为这些机制和语义对于实现正确的逻辑是很重要的。在第二部分中,我解释了 Go 调度 的语义,我认为它能帮助你理解如何在 Go 中编写高质量的并发程序。在这篇文章中,我会把 系统调度Go 调度 的机制和语义结合在一起,以便更深入地理解什么才是并发以及它的本质。

什么是并发

并发意味着 乱序 执行。 拿一组原来是顺序执行的指令,而后找到一种方法,使这些指令乱序执行,但仍然产生相同的结果。 那么,顺序执行还是乱序执行?根本在于,针对我们目前考虑的问题,使用并发必须是有收益的!确切来说,是并发带来的性能提升要大于它带来的复杂性成本。当然有些场景,代码逻辑就已经约束了我们不能执行乱序,这样使用并发也就没有了意义。

并发与并行

理解 并发并行 的不同也非常重要。 并行 意味着同时执行两个或更多指令,简单来说,只有多个CPU核心之间才叫 并行 。在 Go 中,至少要有两个操作系统硬件线程并至少有两个 Goroutine 时才能实现并行,每个 Goroutine 在一个单独的系统线程上执行指令。

如图:

Golang - 调度剖析【第三部分】

我们看到有两个逻辑处理器 P ,每个逻辑处理器都挂载在一个系统线程 M 上,而每个 M 适配到计算机上的一个CPU处理器 Core

其中,有两个 Goroutine G1G2并行 执行,因为它们同时在各自的系统硬件线程上执行指令。

再看,在每一个逻辑处理器中,都有三个 Goroutine G2 G3 G5G1 G4 G6 轮流共享各自的系统线程。看起来就像这三个 Goroutine 在同时运行着,没有特定顺序地执行它们的指令,并在系统线程上共享时间。

那么这就会发生 竞争 ,有时候如果只在一个物理核心上实现并发则实际上会降低吞吐量。还有有意思的是,有时候即便利用上了并行的并发,也不会给你带来想象中更大的性能提升。

工作负载

我们怎么判断在什么时候并发会更有意义呢?我们就从了解当前执行逻辑的工作负载类型开始。在考虑并发时,有两种类型的工作负载是很重要的。

两种类型

CPU-Bound:这是一种不会导致 Goroutine 主动切换上下文到等待状态的类型。它会一直不停地进行计算。比如说,计算 π 到第 N 位的 Goroutine 就是 CPU-Bound 的。

IO-Bound:与上面相反,这种类型会导致 Goroutine 自然地进入到等待状态。它包括请求通过网络访问资源,或使用系统调用进入操作系统,或等待事件的发生。比如说,需要读取文件的 Goroutine 就是 IO-Bound。我把同步事件(互斥,原子),会导致 Goroutine 等待的情况也包含在此类。

CPU-Bound 中,我们需要利用并行。因为单个系统线程处理多个 Goroutine 的效率不高。而使用比系统线程更多的 Goroutine 也会拖慢执行速度,因为在系统线程上切换 Goroutine 是有时间成本的。上下文切换会导致发生 STW(Stop The World) ,意思是在切换期间当前工作指令都不会被执行。

IO-Bound 中,并行则不是必须的了。单个系统线程可以高效地处理多个 Goroutine,是因为Goroutine 在执行这类指令时会自然地进入和退出等待状态。使用比系统线程更多的 Goroutine 可以加快执行速度,因为此时在系统线程上切换 Goroutine 的延迟成本并不会产生 STW 事件。进入到IO阻塞时,CPU就闲下来了,那么我们可以使不同的 Goroutine 有效地复用相同的线程,不让系统线程闲置。

我们如何评估一个系统线程匹配多少 Gorountine 是最合适的呢?如果 Goroutine 少了,则会无法充分利用硬件;如果 Goroutine 多了,则会导致上下文切换延迟。这是一个值得考虑的问题,但此时暂不深究。

现在,更重要的是要通过仔细推敲代码来帮助我们准确识别什么情况需要并发,什么情况不能用并发,以及是否需要并行。

加法

我们不需要复杂的代码来展示和理解这些语义。先来看看下面这个名为 add 的函数:

1 func add(numbers []int) int {
2      var v int
3     for _, n := range numbers {
4         v += n
5     }
6     return v
7 }

在第 1 行,声明了一个名为 add 的函数,它接收一个整型切片并返回切片中所有元素的和。它从第 2 行开始,声明了一个 v 变量来保存总和。然后第 3 行,线性地遍历切片,并且每个数字被加到 v 中。最后在第 6 行,函数将最终的总和返回给调用者。

问题: add 函数是否适合并发执行?从大体上来说答案是适合的。可以将输入切片分解,然后同时处理它们。最后将每个小切片的执行结果相加,就可以得到和顺序执行相同的最终结果。

与此同时,引申出另外一个问题:应该分成多少个小切片来处理是性能最佳的呢?要回答此问题,我们必须知道它的工作负载类型。

add 函数正在执行 CPU-Bound 工作负载,因为实现算法正在执行纯数学运算,并且它不会导致 Goroutine 进入等待状态。这意味着每个系统线程使用一个 Goroutine 就可以获得不错的吞吐量。

并发版本

下面来看一下并发版本如何实现,声明一个 addConcurrent 函数。代码量相比顺序版本增加了很多。

1 func addConcurrent(goroutines int, numbers []int) int {
2     var v int64
3     totalNumbers := len(numbers)
4     lastGoroutine := goroutines - 1
5     stride := totalNumbers / goroutines
6
7     var wg sync.WaitGroup
8     wg.Add(goroutines)
9
10     for g := 0; g < goroutines; g++ {
11         go func(g int) {
12             start := g * stride
13             end := start + stride
14             if g == lastGoroutine {
15                 end = totalNumbers
16             }
17
18             var lv int
19             for _, n := range numbers[start:end] {
20                 lv += n
21             }
22
23             atomic.AddInt64(&v, int64(lv))
24             wg.Done()
25         }(g)
26     }
27
28     wg.Wait()
29
30     return int(v)
31 }

第 5 行:计算每个 Goroutine 的子切片大小。使用输入切片总数除以 Goroutine 的数量得到。

第 10 行:创建一定数量的 Goroutine 执行子任务

第 14-16 行:子切片剩下的所有元素都放到最后一个 Goroutine 执行,可能比前几个 Goroutine 处理的数据要多。

第 23 行:将子结果追加到最终结果中。

然而,并发版本肯定比顺序版本更复杂,但和增加的复杂性相比,性能有提升吗?值得这么做吗?让我们用事实来说话,下面运行基准测试。

基准测试

下面的基准测试,我使用了1000万个数字的切片,并关闭了GC。分别有顺序版本 add 函数和并发版本 addConcurrent 函数。

func BenchmarkSequential(b *testing.B) {
    for i := 0; i < b.N; i++ {
        add(numbers)
    }
}

func BenchmarkConcurrent(b *testing.B) {
    for i := 0; i < b.N; i++ {
        addConcurrent(runtime.NumCPU(), numbers)
    }
}

无并行

以下是所有 Goroutine 只有一个硬件线程可用的结果。顺序版本使用 1 Goroutine ,并发版本在我的机器上使用 runtime.NumCPU8 Goroutines 。在这种情况下,并发版本实际正跑在没有并行的机制上。

10 Million Numbers using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential              1000       5720764 ns/op : ~10% Faster
BenchmarkConcurrent              1000       6387344 ns/op
BenchmarkSequentialAgain         1000       5614666 ns/op : ~13% Faster
BenchmarkConcurrentAgain         1000       6482612 ns/op

结果表明:当只有一个系统线程可用于所有 Goroutine 时,顺序版本比并发快约10%到13%。这和我们之前的理论预期相符,主要就是因为并发版本在单核上的上下文切换和 Goroutine 管理调度的开销。

有并行

以下是每个 Goroutine 都有单独可用的系统线程的结果。顺序版本使用 1 Goroutine ,并发版本在我的机器上使用 runtime.NumCPU8 Goroutines 。在这种情况下,并发版本利用上了并行机制。

10 Million Numbers using 8 goroutines with 8 cores
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential-8                1000       5910799 ns/op
BenchmarkConcurrent-8                2000       3362643 ns/op : ~43% Faster
BenchmarkSequentialAgain-8           1000       5933444 ns/op
BenchmarkConcurrentAgain-8           2000       3477253 ns/op : ~41% Faster

结果表明:当为每个 Goroutine 提供单独的系统线程时,并发版本比顺序版本快大约41%到43%。这才也和预期一致,所有 Goroutine 现都在并行运行着,意味着他们真的在同时执行。

排序

另外,我们也要知道并非所有的 CPU-Bound 都适合并发。当切分输入或合并结果的代价非常高时,就不太合适。下面展示一个冒泡 排序 算法来说明此场景。

顺序版本

01 package main
02
03 import "fmt"
04
05 func bubbleSort(numbers []int) {
06     n := len(numbers)
07     for i := 0; i < n; i++ {
08         if !sweep(numbers, i) {
09             return
10         }
11     }
12 }
13
14 func sweep(numbers []int, currentPass int) bool {
15     var idx int
16     idxNext := idx + 1
17     n := len(numbers)
18     var swap bool
19
20     for idxNext < (n - currentPass) {
21         a := numbers[idx]
22         b := numbers[idxNext]
23         if a > b {
24             numbers[idx] = b
25             numbers[idxNext] = a
26             swap = true
27         }
28         idx++
29         idxNext = idx + 1
30     }
31     return swap
32 }
33
34 func main() {
35     org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
36     fmt.Println(org)
37
38     bubbleSort(org)
39     fmt.Println(org)
40 }

这种 排序算法 会扫描每次在交换值时传递的切片。在对所有内容进行排序之前,可能需要多次遍历切片。

那么问题: bubbleSort 函数是否适用并发?我相信答案是否定的。原始切片可以分解为较小的,并且可以同时对它们排序。但是!在并发执行完之后,没有一个有效的手段将子结果的切片排序合并。下面我们来看并发版本是如何实现的。

并发版本

01 func bubbleSortConcurrent(goroutines int, numbers []int) {
02     totalNumbers := len(numbers)
03     lastGoroutine := goroutines - 1
04     stride := totalNumbers / goroutines
05
06     var wg sync.WaitGroup
07     wg.Add(goroutines)
08
09     for g := 0; g < goroutines; g++ {
10         go func(g int) {
11             start := g * stride
12             end := start + stride
13             if g == lastGoroutine {
14                 end = totalNumbers
15             }
16
17             bubbleSort(numbers[start:end])
18             wg.Done()
19         }(g)
20     }
21
22     wg.Wait()
23
24     // Ugh, we have to sort the entire list again.
25     bubbleSort(numbers)
26 }

bubbleSortConcurrent 它使用多个 Goroutine 同时对输入的一部分进行排序。我们直接来看结果:

Before:
  25 51 15 57 87 10 10 85 90 32 98 53
  91 82 84 97 67 37 71 94 26  2 81 79
  66 70 93 86 19 81 52 75 85 10 87 49

After:
  10 10 15 25 32 51 53 57 85 87 90 98
   2 26 37 67 71 79 81 82 84 91 94 97
  10 19 49 52 66 70 75 81 85 86 87 93

由于冒泡排序的本质是依次扫描,第 25 行对 bubbleSort 的调用将掩盖使用并发解决问题带来的潜在收益。结论是:在冒泡排序中,使用并发不会带来性能提升。

读取文件

前面已经举了两个 CPU-Bound 的例子,下面我们来看 IO-Bound

顺序版本

01 func find(topic string, docs []string) int {
02     var found int
03     for _, doc := range docs {
04         items, err := read(doc)
05         if err != nil {
06             continue
07         }
08         for _, item := range items {
09             if strings.Contains(item.Description, topic) {
10                 found++
11             }
12         }
13     }
14     return found
15 }

第 2 行:声明了一个名为 found 的变量,用于保存在给定文档中找到指定主题的次数。

第 3-4 行:迭代文档,并使用 read 函数读取每个文档。

第 8-11 行:使用 strings.Contains 函数检查文档中是否包含指定主题。如果包含,则 found 加1。

然后来看一下 read 是如何实现的。

01 func read(doc string) ([]item, error) {
02     time.Sleep(time.Millisecond) // 模拟阻塞的读
03     var d document
04     if err := xml.Unmarshal([]byte(file), &d); err != nil {
05         return nil, err
06     }
07     return d.Channel.Items, nil
08 }

此功能以 time.Sleep 开始,持续1毫秒。此调用用于模拟在我们执行实际系统调用以从磁盘读取文档时可能产生的延迟。这种延迟的一致性对于准确测量 find 顺序版本和并发版本的性能差距非常重要。

然后在第 03-07 行,将存储在全局变量文件中的模拟 xml 文档反序列化为 struct 值。最后,将 Items 返回给调用者。

并发版本

01 func findConcurrent(goroutines int, topic string, docs []string) int {
02     var found int64
03
04     ch := make(chan string, len(docs))
05     for _, doc := range docs {
06         ch <- doc
07     }
08     close(ch)
09
10     var wg sync.WaitGroup
11     wg.Add(goroutines)
12
13     for g := 0; g < goroutines; g++ {
14         go func() {
15             var lFound int64
16             for doc := range ch {
17                 items, err := read(doc)
18                 if err != nil {
19                     continue
20                 }
21                 for _, item := range items {
22                     if strings.Contains(item.Description, topic) {
23                         lFound++
24                     }
25                 }
26             }
27             atomic.AddInt64(&found, lFound)
28             wg.Done()
29         }()
30     }
31
32     wg.Wait()
33
34     return int(found)
35 }

第 4-7 行:创建一个 channel 并写入所有要处理的文档。

第 8 行:关闭这个 channel ,这样当读取完所有文档后就会直接退出循环。

第 16-26 行:每个 Goroutine 都从同一个 channel 接收文档, readstrings.Contains 逻辑和顺序的版本一致。

第 27 行:将各个 Goroutine 计数加在一起作为最终计数。

基准测试

同样的,我们再次运行基准测试来验证我们的结论。

func BenchmarkSequential(b *testing.B) {
    for i := 0; i < b.N; i++ {
        find("test", docs)
    }
}

func BenchmarkConcurrent(b *testing.B) {
    for i := 0; i < b.N; i++ {
        findConcurrent(runtime.NumCPU(), "test", docs)
    }
}

无并行

10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential                 3    1483458120 ns/op
BenchmarkConcurrent                20     188941855 ns/op : ~87% Faster
BenchmarkSequentialAgain            2    1502682536 ns/op
BenchmarkConcurrentAgain           20     184037843 ns/op : ~88% Faster

当只有一个系统线程时,并发版本比顺序版本快大约87%到88%。与预期一致,因为所有 Goroutine 都有效地共享单个系统线程。

有并行

10 Thousand Documents using 8 goroutines with 8 core
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential-8                   3    1490947198 ns/op
BenchmarkConcurrent-8                  20     187382200 ns/op : ~88% Faster
BenchmarkSequentialAgain-8              3    1416126029 ns/op
BenchmarkConcurrentAgain-8             20     185965460 ns/op : ~87% Faster

有意思的来了,使用额外的系统线程提供并行能力,实际代码性能却没有提升。也印证了开头的说法。

结语

我们可以清楚地看到,使用 IO-Bound 并不需要并行来获得性能上的巨大提升。这与我们在 CPU-Bound 中看到的结果相反。当涉及像冒泡排序这样的算法时,并发的使用会增加复杂性而没有任何实际的性能优势。

所以,我们在考虑解决方案时,首先要确定它是否适合并发,而不是盲目认为使用更多的 Goroutine 就一定会提升性能。


以上所述就是小编给大家介绍的《Golang - 调度剖析【第三部分】》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

转型之战

转型之战

吴晓波 / 2015-7-1

互联网时代大潮席卷而来,互联网究竟是“魔法手杖”,还是“效率金箍棒”?传统企业如何正确借助和利用互联网思维帮助自身转变思维、完成企业转型升级?本书分两篇,上篇为传统行业互联网转型极具代表性和借鉴意义的案例,下篇精选吴晓波转型大课的独 家内容,梳理了吴晓波、刘伟、刘润、金霞、刘博、赵峰、张蕴蓝、张荣耀、李嘉聪、曾玉波等各行业10位导师关于互联网思维的精华理念和观点,其中囊括各传统行业互联网转型成功的......一起来看看 《转型之战》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试