首先,在我平时遇到问题的时候,特别是如果它是一个新问题,我一开始并不会考虑使用并发的设计去解决它。我会先实现顺序执行的逻辑,并确保它能正常工作。然后在可读性和技术关键点都 Review 之后,我才会开始思考并发执行的实用性和可行性。有的时候,并发执行是一个很好的选择,有时则不一定。
在本系列的第一部分中,我解释了 系统调度 的机制和语义,如果你打算编写多线程代码,我认为这些机制和语义对于实现正确的逻辑是很重要的。在第二部分中,我解释了 Go 调度 的语义,我认为它能帮助你理解如何在 Go 中编写高质量的并发程序。在这篇文章中,我会把 系统调度 和 Go 调度 的机制和语义结合在一起,以便更深入地理解什么才是并发以及它的本质。
并发意味着 乱序
执行。 拿一组原来是顺序执行的指令,而后找到一种方法,使这些指令乱序执行,但仍然产生相同的结果。 那么,顺序执行还是乱序执行?根本在于,针对我们目前考虑的问题,使用并发必须是有收益的!确切来说,是并发带来的性能提升要大于它带来的复杂性成本。当然有些场景,代码逻辑就已经约束了我们不能执行乱序,这样使用并发也就没有了意义。
理解 并发
与 并行
的不同也非常重要。 并行
意味着同时执行两个或更多指令,简单来说,只有多个CPU核心之间才叫 并行
。在 Go 中,至少要有两个操作系统硬件线程并至少有两个 Goroutine 时才能实现并行,每个 Goroutine 在一个单独的系统线程上执行指令。
我们看到有两个逻辑处理器 P
,每个逻辑处理器都挂载在一个系统线程 M
上,而每个 M
适配到计算机上的一个CPU处理器 Core
其中,有两个 Goroutine G1
和 G2
在 并行
再看,在每一个逻辑处理器中,都有三个 Goroutine G2 G3 G5
或 G1 G4 G6
轮流共享各自的系统线程。看起来就像这三个 Goroutine 在同时运行着,没有特定顺序地执行它们的指令,并在系统线程上共享时间。
那么这就会发生 竞争 ,有时候如果只在一个物理核心上实现并发则实际上会降低吞吐量。还有有意思的是,有时候即便利用上了并行的并发,也不会给你带来想象中更大的性能提升。
CPU-Bound:这是一种不会导致 Goroutine 主动切换上下文到等待状态的类型。它会一直不停地进行计算。比如说,计算 π 到第 N 位的 Goroutine 就是 CPU-Bound 的。
IO-Bound:与上面相反,这种类型会导致 Goroutine 自然地进入到等待状态。它包括请求通过网络访问资源,或使用系统调用进入操作系统,或等待事件的发生。比如说,需要读取文件的 Goroutine 就是 IO-Bound。我把同步事件(互斥,原子),会导致 Goroutine 等待的情况也包含在此类。
在 CPU-Bound 中,我们需要利用并行。因为单个系统线程处理多个 Goroutine 的效率不高。而使用比系统线程更多的 Goroutine 也会拖慢执行速度,因为在系统线程上切换 Goroutine 是有时间成本的。上下文切换会导致发生 STW(Stop The World)
在 IO-Bound 中,并行则不是必须的了。单个系统线程可以高效地处理多个 Goroutine,是因为Goroutine 在执行这类指令时会自然地进入和退出等待状态。使用比系统线程更多的 Goroutine 可以加快执行速度,因为此时在系统线程上切换 Goroutine 的延迟成本并不会产生 STW
事件。进入到IO阻塞时,CPU就闲下来了,那么我们可以使不同的 Goroutine 有效地复用相同的线程,不让系统线程闲置。
我们如何评估一个系统线程匹配多少 Gorountine 是最合适的呢?如果 Goroutine 少了,则会无法充分利用硬件;如果 Goroutine 多了,则会导致上下文切换延迟。这是一个值得考虑的问题,但此时暂不深究。
我们不需要复杂的代码来展示和理解这些语义。先来看看下面这个名为 add
1 func add(numbers []int) int { 2 var v int 3 for _, n := range numbers { 4 v += n 5 } 6 return v 7 }
在第 1 行,声明了一个名为 add
的函数,它接收一个整型切片并返回切片中所有元素的和。它从第 2 行开始,声明了一个 v
变量来保存总和。然后第 3 行,线性地遍历切片,并且每个数字被加到 v
中。最后在第 6 行,函数将最终的总和返回给调用者。
问题: add
函数正在执行 CPU-Bound 工作负载,因为实现算法正在执行纯数学运算,并且它不会导致 Goroutine 进入等待状态。这意味着每个系统线程使用一个 Goroutine 就可以获得不错的吞吐量。
下面来看一下并发版本如何实现,声明一个 addConcurrent
1 func addConcurrent(goroutines int, numbers []int) int { 2 var v int64 3 totalNumbers := len(numbers) 4 lastGoroutine := goroutines - 1 5 stride := totalNumbers / goroutines 6 7 var wg sync.WaitGroup 8 wg.Add(goroutines) 9 10 for g := 0; g < goroutines; g++ { 11 go func(g int) { 12 start := g * stride 13 end := start + stride 14 if g == lastGoroutine { 15 end = totalNumbers 16 } 17 18 var lv int 19 for _, n := range numbers[start:end] { 20 lv += n 21 } 22 23 atomic.AddInt64(&v, int64(lv)) 24 wg.Done() 25 }(g) 26 } 27 28 wg.Wait() 29 30 return int(v) 31 }
第 5 行:计算每个 Goroutine 的子切片大小。使用输入切片总数除以 Goroutine 的数量得到。
第 10 行:创建一定数量的 Goroutine 执行子任务
第 14-16 行:子切片剩下的所有元素都放到最后一个 Goroutine 执行,可能比前几个 Goroutine 处理的数据要多。
第 23 行:将子结果追加到最终结果中。
下面的基准测试,我使用了1000万个数字的切片,并关闭了GC。分别有顺序版本 add
函数和并发版本 addConcurrent
func BenchmarkSequential(b *testing.B) { for i := 0; i < b.N; i++ { add(numbers) } } func BenchmarkConcurrent(b *testing.B) { for i := 0; i < b.N; i++ { addConcurrent(runtime.NumCPU(), numbers) } }
以下是所有 Goroutine 只有一个硬件线程可用的结果。顺序版本使用 1 Goroutine ,并发版本在我的机器上使用 runtime.NumCPU
或 8 Goroutines 。在这种情况下,并发版本实际正跑在没有并行的机制上。
10 Million Numbers using 8 goroutines with 1 core 2.9 GHz Intel 4 Core i7 Concurrency WITHOUT Parallelism ----------------------------------------------------------------------------- $ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s goos: darwin goarch: amd64 pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound BenchmarkSequential 1000 5720764 ns/op : ~10% Faster BenchmarkConcurrent 1000 6387344 ns/op BenchmarkSequentialAgain 1000 5614666 ns/op : ~13% Faster BenchmarkConcurrentAgain 1000 6482612 ns/op
结果表明:当只有一个系统线程可用于所有 Goroutine 时,顺序版本比并发快约10%到13%。这和我们之前的理论预期相符,主要就是因为并发版本在单核上的上下文切换和 Goroutine 管理调度的开销。
以下是每个 Goroutine 都有单独可用的系统线程的结果。顺序版本使用 1 Goroutine ,并发版本在我的机器上使用 runtime.NumCPU
或 8 Goroutines 。在这种情况下,并发版本利用上了并行机制。
10 Million Numbers using 8 goroutines with 8 cores 2.9 GHz Intel 4 Core i7 Concurrency WITH Parallelism ----------------------------------------------------------------------------- $ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s goos: darwin goarch: amd64 pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound BenchmarkSequential-8 1000 5910799 ns/op BenchmarkConcurrent-8 2000 3362643 ns/op : ~43% Faster BenchmarkSequentialAgain-8 1000 5933444 ns/op BenchmarkConcurrentAgain-8 2000 3477253 ns/op : ~41% Faster
结果表明:当为每个 Goroutine 提供单独的系统线程时,并发版本比顺序版本快大约41%到43%。这才也和预期一致,所有 Goroutine 现都在并行运行着,意味着他们真的在同时执行。
另外,我们也要知道并非所有的 CPU-Bound 都适合并发。当切分输入或合并结果的代价非常高时,就不太合适。下面展示一个冒泡 排序 算法来说明此场景。
01 package main 02 03 import "fmt" 04 05 func bubbleSort(numbers []int) { 06 n := len(numbers) 07 for i := 0; i < n; i++ { 08 if !sweep(numbers, i) { 09 return 10 } 11 } 12 } 13 14 func sweep(numbers []int, currentPass int) bool { 15 var idx int 16 idxNext := idx + 1 17 n := len(numbers) 18 var swap bool 19 20 for idxNext < (n - currentPass) { 21 a := numbers[idx] 22 b := numbers[idxNext] 23 if a > b { 24 numbers[idx] = b 25 numbers[idxNext] = a 26 swap = true 27 } 28 idx++ 29 idxNext = idx + 1 30 } 31 return swap 32 } 33 34 func main() { 35 org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0} 36 fmt.Println(org) 37 38 bubbleSort(org) 39 fmt.Println(org) 40 }
这种 排序算法 会扫描每次在交换值时传递的切片。在对所有内容进行排序之前,可能需要多次遍历切片。
那么问题: bubbleSort
01 func bubbleSortConcurrent(goroutines int, numbers []int) { 02 totalNumbers := len(numbers) 03 lastGoroutine := goroutines - 1 04 stride := totalNumbers / goroutines 05 06 var wg sync.WaitGroup 07 wg.Add(goroutines) 08 09 for g := 0; g < goroutines; g++ { 10 go func(g int) { 11 start := g * stride 12 end := start + stride 13 if g == lastGoroutine { 14 end = totalNumbers 15 } 16 17 bubbleSort(numbers[start:end]) 18 wg.Done() 19 }(g) 20 } 21 22 wg.Wait() 23 24 // Ugh, we have to sort the entire list again. 25 bubbleSort(numbers) 26 }
它使用多个 Goroutine 同时对输入的一部分进行排序。我们直接来看结果:
Before: 25 51 15 57 87 10 10 85 90 32 98 53 91 82 84 97 67 37 71 94 26 2 81 79 66 70 93 86 19 81 52 75 85 10 87 49 After: 10 10 15 25 32 51 53 57 85 87 90 98 2 26 37 67 71 79 81 82 84 91 94 97 10 19 49 52 66 70 75 81 85 86 87 93
由于冒泡排序的本质是依次扫描,第 25 行对 bubbleSort
前面已经举了两个 CPU-Bound 的例子,下面我们来看 IO-Bound 。
01 func find(topic string, docs []string) int { 02 var found int 03 for _, doc := range docs { 04 items, err := read(doc) 05 if err != nil { 06 continue 07 } 08 for _, item := range items { 09 if strings.Contains(item.Description, topic) { 10 found++ 11 } 12 } 13 } 14 return found 15 }
第 2 行:声明了一个名为 found
第 3-4 行:迭代文档,并使用 read
第 8-11 行:使用 strings.Contains
函数检查文档中是否包含指定主题。如果包含,则 found
然后来看一下 read
01 func read(doc string) ([]item, error) { 02 time.Sleep(time.Millisecond) // 模拟阻塞的读 03 var d document 04 if err := xml.Unmarshal([]byte(file), &d); err != nil { 05 return nil, err 06 } 07 return d.Channel.Items, nil 08 }
此功能以 time.Sleep
开始,持续1毫秒。此调用用于模拟在我们执行实际系统调用以从磁盘读取文档时可能产生的延迟。这种延迟的一致性对于准确测量 find
然后在第 03-07 行,将存储在全局变量文件中的模拟 xml
文档反序列化为 struct
值。最后,将 Items
01 func findConcurrent(goroutines int, topic string, docs []string) int { 02 var found int64 03 04 ch := make(chan string, len(docs)) 05 for _, doc := range docs { 06 ch <- doc 07 } 08 close(ch) 09 10 var wg sync.WaitGroup 11 wg.Add(goroutines) 12 13 for g := 0; g < goroutines; g++ { 14 go func() { 15 var lFound int64 16 for doc := range ch { 17 items, err := read(doc) 18 if err != nil { 19 continue 20 } 21 for _, item := range items { 22 if strings.Contains(item.Description, topic) { 23 lFound++ 24 } 25 } 26 } 27 atomic.AddInt64(&found, lFound) 28 wg.Done() 29 }() 30 } 31 32 wg.Wait() 33 34 return int(found) 35 }
第 4-7 行:创建一个 channel
第 8 行:关闭这个 channel
第 16-26 行:每个 Goroutine 都从同一个 channel
接收文档, read
并 strings.Contains
第 27 行:将各个 Goroutine 计数加在一起作为最终计数。
func BenchmarkSequential(b *testing.B) { for i := 0; i < b.N; i++ { find("test", docs) } } func BenchmarkConcurrent(b *testing.B) { for i := 0; i < b.N; i++ { findConcurrent(runtime.NumCPU(), "test", docs) } }
10 Thousand Documents using 8 goroutines with 1 core 2.9 GHz Intel 4 Core i7 Concurrency WITHOUT Parallelism ----------------------------------------------------------------------------- $ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s goos: darwin goarch: amd64 pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound BenchmarkSequential 3 1483458120 ns/op BenchmarkConcurrent 20 188941855 ns/op : ~87% Faster BenchmarkSequentialAgain 2 1502682536 ns/op BenchmarkConcurrentAgain 20 184037843 ns/op : ~88% Faster
当只有一个系统线程时,并发版本比顺序版本快大约87%到88%。与预期一致,因为所有 Goroutine 都有效地共享单个系统线程。
10 Thousand Documents using 8 goroutines with 8 core 2.9 GHz Intel 4 Core i7 Concurrency WITH Parallelism ----------------------------------------------------------------------------- $ GOGC=off go test -run none -bench . -benchtime 3s goos: darwin goarch: amd64 pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound BenchmarkSequential-8 3 1490947198 ns/op BenchmarkConcurrent-8 20 187382200 ns/op : ~88% Faster BenchmarkSequentialAgain-8 3 1416126029 ns/op BenchmarkConcurrentAgain-8 20 185965460 ns/op : ~87% Faster
我们可以清楚地看到,使用 IO-Bound 并不需要并行来获得性能上的巨大提升。这与我们在 CPU-Bound 中看到的结果相反。当涉及像冒泡排序这样的算法时,并发的使用会增加复杂性而没有任何实际的性能优势。
所以,我们在考虑解决方案时,首先要确定它是否适合并发,而不是盲目认为使用更多的 Goroutine 就一定会提升性能。
