内容简介:原文地址:在这里,假设
原文地址: 来,控制一下 Goroutine 的并发数量
问题
func main() { userCount := math.MaxInt64 for i := 0; i < userCount; i++ { go func(i int) { // 做一些各种各样的业务逻辑处理 fmt.Printf("go func: %d\n", i) time.Sleep(time.Second) }(i) } }
在这里,假设 userCount
是一个外部传入的参数(不可预测,有可能值非常大),有人会全部丢进去循环。想着全部都并发 goroutine 去同时做某一件事。觉得这样子会效率会更高,对不对!
那么,你觉得这里有没有什么问题?
噩梦般的开始
当然,在 特定场景下 ,问题可大了。因为在本文被丢进去同时并发的可是一个极端值。我们可以一起观察下图的指标分析,看看情况有多 “崩溃”。下图是上述代码的表现:
输出结果
... go func: 5839 go func: 5840 go func: 5841 go func: 5842 go func: 5915 go func: 5524 go func: 5916 go func: 8209 go func: 8264 signal: killed
如果你自己执行过代码,在 “输出结果” 上你会遇到如下问题:
- 系统资源占用率不断上涨
- 输出一定数量后:控制台就不再刷新输出最新的值了
- 信号量:signal: killed
系统负载
CPU
短时间内系统负载暴增
虚拟内存
短时间内占用的虚拟内存暴增
top
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE BOOSTS ... 73414 test 100.2 01:59.50 9/1 0 18 6801M+ 0B 114G+ 73403 73403 running *0[1]
小结
如果仔细看过监控 工具 的示意图,就可以知道其实我间隔的执行了两次,能看到系统间的使用率幅度非常大。当进程被杀掉后,整体又恢复为正常值
在这里,我们回到主题,就是在 不控制并发的 goroutine 数量 会发生什么问题?大致如下:
- CPU 使用率浮动上涨
- Memory 占用不断上涨。也可以看看 CMPRS,它表示进程的压缩数据的字节数。已经到达 114G+ 了
- 主进程崩溃(被杀掉了)
简单来说,“崩溃” 的原因就是对系统资源的占用过大。常见的比如:打开文件数(too many files open)、内存占用等等
危害
对该台服务器产生非常大的影响,影响自身及相关联的应用。很有可能导致不可用或响应缓慢,另外启动了复数 “失控” 的 goroutine,导致程序流转混乱
解决方案
在前面花了大量篇幅,渲染了在存在大量并发 goroutine 数量时,不控制的话会出现 “严重” 的问题,接下来一起思考下解决方案。如下:
- 控制/限制 goroutine 同时并发运行的数量
- 改变应用程序的逻辑写法(避免大规模的使用系统资源和等待)
-
调整服务的硬件配置、最大打开数、内存等阈值
控制 goroutine 并发数量
接下来正式的开始解决这个问题,希望你认真阅读的同时加以思考,因为这个问题在实际项目中真的是太常见了!
问题已经抛出来了,你需要做的是 想想有什么办法 解决这个问题。建议你自行思考一下技术方案。再接着往下看 :-)
尝试 chan
func main() { userCount := 10 ch := make(chan bool, 2) for i := 0; i < userCount; i++ { ch <- true go Read(ch, i) } //time.Sleep(time.Second) } func Read(ch chan bool, i int) { fmt.Printf("go func: %d\n", i) <- ch }
输出结果:
go func: 1 go func: 2 go func: 3 go func: 4 go func: 5 go func: 6 go func: 7 go func: 8 go func: 0
嗯,我们似乎很好的控制了 2 个 2 个的 “顺序” 执行多个 goroutine。但是,问题出现了。你仔细数一下输出结果,才 9 个值?
这明显就不对。原因出在当主协程结束时,子协程也是会被终止掉的。因此剩余的 goroutine 没来及把值输出,就被送上路了(不信你把 time.Sleep
打开看看,看看输出数量)
尝试 sync
... var wg = sync.WaitGroup{} func main() { userCount := 10 for i := 0; i < userCount; i++ { wg.Add(1) go Read(i) } wg.Wait() } func Read(i int) { defer wg.Done() fmt.Printf("go func: %d\n", i) }
嗯,单纯的使用 sync.WaitGroup
也不行。没有控制到同时并发的 goroutine 数量(代指达不到本文所要求的目标)
小结
单纯 简单 使用 channel 或 sync 都有明显缺陷,不行。我们再看看组件配合能不能实现
尝试 chan + sync
... var wg = sync.WaitGroup{} func main() { userCount := 10 ch := make(chan bool, 2) for i := 0; i < userCount; i++ { go Read(ch, i) } wg.Wait() } func Read(ch chan bool, i int) { defer wg.Done() wg.Add(1) ch <- true fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix()) time.Sleep(time.Second) <-ch }
输出结果:
go func: 9, time: 1547911938 go func: 1, time: 1547911938 go func: 6, time: 1547911939 go func: 7, time: 1547911939 go func: 8, time: 1547911940 go func: 0, time: 1547911940 go func: 3, time: 1547911941 go func: 2, time: 1547911941 go func: 4, time: 1547911942 go func: 5, time: 1547911942
从输出结果来看,确实实现了控制 goroutine 以 2 个 2 个的数量去执行我们的 “业务逻辑”,当然结果集也理所应当的是乱序输出
方案一:简单 Semaphore
在确立了简单使用 chan + sync 的方案是可行后,我们重新将流转逻辑封装为 gsema ,主程序变成如下:
import ( "fmt" "time" "github.com/EDDYCJY/gsema" ) var sema = gsema.NewSemaphore(3) func main() { userCount := 10 for i := 0; i < userCount; i++ { go Read(i) } sema.Wait() } func Read(i int) { defer sema.Done() sema.Add(1) fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix()) time.Sleep(time.Second) }
分析方案
在上述代码中,程序执行流程如下:
sema
看上去人模人样,没什么严重问题。但却有一个 “大” 坑,认真看到第二点 “每次启动一个 goroutine” 这句话。这里 有点问题 ,提前产生那么多的 goroutine 会不会有什么问题,接下来一起分析下利弊,如下:
利
-
适合 量不大、复杂度低 的使用场景
- 几百几千个、几十万个也是可以接受的(看具体业务场景)
- 实际业务逻辑在运行前就已经被阻塞等待了(因为并发数受限),基本实际业务逻辑损耗的性能比 goroutine 本身大
- goroutine 本身很轻便,仅损耗极少许的内存空间和调度。这种等待响应的情况都是躺好了,等待任务唤醒
- Semaphore 操作复杂度低且流转简单,容易控制
弊
-
不适合 量很大、复杂度高 的使用场景
- 有几百万、几千万个 goroutine 的话,就浪费了大量调度 goroutine 和内存空间。恰好你的服务器也接受不了的话
- Semaphore 操作复杂度提高,要管理更多的状态
小结
- 基于什么业务场景,就用什么方案去做事
- 有足够的时间,允许你去追求更优秀、极致的方案(用第三方库也行)
用哪种方案,我认为主要基于以上两点去思考,都是 OK 的。没有对错,只有当前业务场景能不能接受,这个预先启动的 goroutine 数量你的系统是否能够接受
当然了,常见/简单的 Go 应用采用这类技术方案,基本就能解决问题了。因为像本文第一节 “问题” 如此超巨大数量的情况,情况很少。其并不存在那些 “特殊性”。因此用这个方案基本 OK
灵活控制 goroutine 并发数量
小手一紧。隔壁老王发现了新的问题。“方案一” 中,在 输入输出一体 的情况下,在常见的业务场景中确实可以
但,这次新的业务场景比较特殊,要控制输入的数量,以此达到 改变允许并发运行 goroutine 的数量 。我们仔细想想,要做出如下改变:
- 输入/输出要抽离,才可以分别控制
- 输入/输出要可变,理所应当在 for-loop 中(可设置数值的地方)
- 允许改变 goroutine 并发数量,但它也必须有一个 最大值 (因为允许改变是相对)
方案二:灵活 chan + sync
package main import ( "fmt" "sync" "time" ) var wg sync.WaitGroup func main() { userCount := 10 ch := make(chan int, 5) for i := 0; i < userCount; i++ { wg.Add(1) go func() { defer wg.Done() for d := range ch { fmt.Printf("go func: %d, time: %d\n", d, time.Now().Unix()) time.Sleep(time.Second * time.Duration(d)) } }() } for i := 0; i < 10; i++ { ch <- 1 ch <- 2 //time.Sleep(time.Second) } close(ch) wg.Wait() }
输出结果:
... go func: 1, time: 1547950567 go func: 3, time: 1547950567 go func: 1, time: 1547950567 go func: 2, time: 1547950567 go func: 2, time: 1547950567 go func: 3, time: 1547950567 go func: 1, time: 1547950568 go func: 2, time: 1547950568 go func: 3, time: 1547950568 go func: 1, time: 1547950568 go func: 3, time: 1547950569 go func: 2, time: 1547950569
在 “方案二” 中,我们可以随时随地的根据新的业务需求,做如下事情:
- 变更 channel 的输入数量
- 能够根据特殊情况,变更 channel 的循环值
- 变更最大允许并发的 goroutine 数量
总的来说,就是可控空间都尽量放开了,是不是更加灵活了呢 :-)
方案三:第三方库
比较成熟的第三方库也不少,基本都是以生成和管理 goroutine 为目标的池工具。我简单列了几个,具体建议大家阅读下源码或者多找找,原理相似
总结
在本文的开头,我花了大力气(极端数量),告诉你 同时并发过多的 goroutine 数量会导致系统占用资源不断上涨。最终该服务崩盘的极端情况 。为的是希望你今后避免这种问题,给你留下深刻的印象
接下来我们以 “控制 goroutine 并发数量” 为主题,展开了一番分析。分别给出了三种方案。在我看来,各具优缺点,我建议你 挑选合适自身场景的技术方案 就可以了
因为,有不同类型的技术方案也能解决这个问题,千人千面。本文推荐的是较常见的解决方案,也欢迎大家在评论区继续补充 :-)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Java并发 -- 线程数量
- 控制 Goroutine 的并发数量的方式
- Go 开发中,如何有效控制 Goroutine 的并发数量
- Go开发中,如何有效控制Goroutine的并发数量
- 字符串的长度,是字符数量,还是字节数量?
- kafka增加topic的备份数量
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python源码剖析
陈儒 / 电子工业出版社 / 2008-6 / 69.80元
作为主流的动态语言,Python不仅简单易学、移植性好,而且拥有强大丰富的库的支持。此外,Python强大的可扩展性,让开发人员既可以非常容易地利用C/C++编写Python的扩展模块,还能将Python嵌入到C/C++程序中,为自己的系统添加动态扩展和动态编程的能力。. 为了更好地利用Python语言,无论是使用Python语言本身,还是将Python与C/C++交互使用,深刻理解Pyth......一起来看看 《Python源码剖析》 这本书的介绍吧!