内容简介:这是本系列文章的第三篇,第一篇在此前文描述了手工作坊的时代和工业时代,现在我们进入信息时代。前文描述的工业时代其实是资本主义,来到世间的每个毛孔都在滴血。不信你看前文的代码,
这是本系列文章的第三篇,第一篇在此 golang并发三板斧系列之一:channel用于通信和同步 ,第二篇在此 golang并发三板斧系列之二:goroutine池用于并发 。
前文描述了手工作坊的时代和工业时代,现在我们进入信息时代。
万恶的资本主义
前文描述的工业时代其实是资本主义,来到世间的每个毛孔都在滴血。不信你看前文的代码, gen
之类的函数创建了一堆任务之后就扔给下游的works处理了,也不管他们要处理多久,是不是加班到深夜,是不是996ICU。
func BenchmarkCPUPool(b *testing.B) { channum := 100 gonum := runtime.NumCPU() for i := 0; i < b.N; i++ { f := func(w *Work) { if v, ok := w.input.(float64); ok { cpubound(v) } } list := benchmarkList() c := genPoolChanBuffer(list) p := InitPool(channum, gonum, f) p.RunWorker() p.FeedWorker(c) p.Wait() } }
人民当家作主
感谢毛主席,解放之后我们进入了社会主义,我们有了工会这个关爱人民的组织。人民的工会爱人民,人民的工会力量大,工会可以在合适的时候给大家放假,让大家休息,对应到程序中就是按下了神圣的 ctrl+c
。
如下是最粗暴的模型,工会一喊放假,大家都放下手上的工作开心的玩耍了。但是有的工作做了一半就放弃了,这确实是很没有职业操守的:
var WorkPool chan work type work struct { sth string } func (w work) Working() { log.Printf("start %s", w.sth) time.Sleep(2 * time.Second) log.Printf("end %s", w.sth) } func RunWorkerSimple() { workers := 2 for i := 0; i < workers; i++ { go HandleWorkerSimple() } } func HandleWorkerSimple() { for { select { case work := <-WorkPool: work.Working() } } return } func TestWorkerSimple(t *testing.T) { WorkPool = make(chan work) go RunWorkerSimple() go func() { list := benchmarkList() for _, l := range list { WorkPool <- work{fmt.Sprint(l)} } }() select {} }
很明显从结果可以看出,有工作编号为2和3的没有完成就被扔掉了,其余没有启动的工作都放弃了。
C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerSimple 2019/04/08 22:56:11 start 1 2019/04/08 22:56:11 start 0 2019/04/08 22:56:13 end 0 2019/04/08 22:56:13 start 2 2019/04/08 22:56:13 end 1 2019/04/08 22:56:13 start 3 ^Csignal: interrupt FAIL _/Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency 2.790s
要对得起这份工作
人民的工人为人民,所以即便工会保障了工人的权益,该做好的工作还是要认真做完啊。对应到程序中,收到 ctrl+c
中断后,每个worker应该完成手上正在做的工作,并且由BOSS(主goroutine)把剩余队列中的工作保存起来(比如写数据库或者文件),留待明天上班继续做。
代码写起来就复杂多了,要监控系统的中断信号,要等待所有worker处理完手上的事情,最后再把剩余的事情保存起来。需要用两个chan来通信,stopChan 用于通知workers下班啦,stoppedChan 用于所有worker处理完之后告知BOSS,再由BOSS保存剩余的工作。
func (w work) Saving() { log.Printf("save %s", w.sth) } func RunWorkerHold(stop, stopped chan struct{}) { var wg sync.WaitGroup workers := 2 for i := 0; i < workers; i++ { wg.Add(1) go HandleWorkerHold(stop, &wg) } wg.Wait() stopped <- struct{}{} } func HandleWorkerHold(stop chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { select { case work := <-WorkPool: work.Working() case <-stop: log.Println("worker: caller has told us to stop") return } } return } func TestWorkerHold(t *testing.T) { WorkPool = make(chan work) stopChan := make(chan struct{}) stoppedChan := make(chan struct{}) go RunWorkerHold(stopChan, stoppedChan) go func() { list := benchmarkList() for _, l := range list { WorkPool <- work{fmt.Sprint(l)} } }() // listen for C-c c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) <-c log.Println("main: received C-c - shutting down") // tell the goroutine to stop log.Println("main: telling workers to stop") close(stopChan) // and wait for them to reply back <-stoppedChan log.Println("main: workers has told us they've finished") for work := range WorkPool { work.Saving() } return }
这里没有把打印贴完,最终的结果是所有工作都save好了的:
C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerHold 2019/04/08 23:32:02 start 1 2019/04/08 23:32:02 start 0 2019/04/08 23:32:04 end 1 2019/04/08 23:32:04 start 2 2019/04/08 23:32:04 end 0 2019/04/08 23:32:04 start 3 ^C2019/04/08 23:32:05 main: received C-c - shutting down 2019/04/08 23:32:05 main: telling workers to stop 2019/04/08 23:32:06 end 2 2019/04/08 23:32:06 worker: caller has told us to stop 2019/04/08 23:32:06 end 3 2019/04/08 23:32:06 start 4 2019/04/08 23:32:08 end 4 2019/04/08 23:32:08 start 5 2019/04/08 23:32:10 end 5 2019/04/08 23:32:10 worker: caller has told us to stop 2019/04/08 23:32:10 main: workers has told us they've finished 2019/04/08 23:32:10 save 6 2019/04/08 23:32:10 save 7 2019/04/08 23:32:10 save 8 2019/04/08 23:32:10 save 9 2019/04/08 23:32:10 save 10 2019/04/08 23:32:10 save 11 2019/04/08 23:32:10 save 12 2019/04/08 23:32:10 save 13 2019/04/08 23:32:10 save 14 2019/04/08 23:32:10 save 15 。。。
但是值得注意的是,不是 close(stopChan)
一执行,马上所有的worker都能结束工作了。如上其中一个worker在接着完成work4和work5之后才走了退出流程,是因为对于 select
来讲,如果多个 chan
都准备好了的话,是随机选择其中一个,所以会有概率一直接着work的。
更进一步
上面的模式还是有缺陷,如果worker下面还有徒弟怎么办(又新开了goroutine)?最后BOSS在做剩余工作的保存时也不想耽误太久怎么办?保存工作写数据库也想受控制(比如 database/sql
包提供了相应支持)怎么办?
终于我们的主角登场了,golang提供了 context
模式用于解决goroutine的高效且安全退出问题,教程在网上很多了,不用细讲,只贴一下主要函数:
// 带cancel返回值的Context,一旦cancel被调用,即取消该创建的context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 带有效期cancel返回值的Context,即必须到达指定时间点调用的cancel方法才会被执行
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
// 带超时时间cancel返回值的Context,类似Deadline,前者是时间点,后者为时间间隔 // 相当于WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
最后进化到我们的代码,注意 SavingDB()
方法只是一个伪代码示意:
import "database/sql" //fake, just a example func (w work) SavingDB(ctx context.Context) { log.Printf("save %s", w.sth) stmt := "select name from db.table" db := sql.DB{} conn, _ := db.Conn(ctx) rows, err := conn.QueryContext(ctx, stmt) if err != nil { if err == context.DeadlineExceeded { // context canceled } return } var name string for rows.Next() { if err := rows.Scan(&name); err != nil { if err == context.DeadlineExceeded { log.Println("scan canceled") } } } } func RunWorkerContext(ctx context.Context, stopped chan struct{}) { var wg sync.WaitGroup workers := 2 for i := 0; i < workers; i++ { wg.Add(1) go HandleWorkerContext(ctx, &wg) } wg.Wait() stopped <- struct{}{} } func HandleWorkerContext(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() for { select { case work := <-WorkPool: work.Working() case <-ctx.Done(): log.Println("worker: caller has told us to stop") return } } return } func TestWorkerContext(t *testing.T) { WorkPool = make(chan work) ctx, cancel := context.WithCancel(context.Background()) defer cancel() stoppedChan := make(chan struct{}) go RunWorkerContext(ctx, stoppedChan) go func() { list := benchmarkList() for _, l := range list { WorkPool <- work{fmt.Sprint(l)} } }() // listen for C-c c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) <-c log.Println("main: received C-c - shutting down") // tell the goroutine to stop log.Println("main: telling workers to stop") cancel() // and wait for them to reply back <-stoppedChan log.Println("main: workers has told us they've finished") ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 100*time.Microsecond) defer cancelTimeout() for { select { case work := <-WorkPool: work.SavingDB(ctxTimeout) case <-ctxTimeout.Done(): log.Println("main: cann't wait any more") return } } return }
通过 cancel()
方法通知该 context.Context
其下的所有goroutine进入退出流程,并可以启动带timeout的ctx开始保存工作流程,所有流程都是受控的。
这像不像是现代企业的扁平化管理,BOSS直接控制所有员工,提升所有的工作效率?
所有代码都在 https://github.com/baixiaoustc/go_concurrency/blob/master/third_post_test.go 中能找到。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 简单设计落地三板斧
- 线上技术故障处理三板斧
- 互联网架构三板斧之并发
- Pandas数据处理三板斧,你会几板?
- 一线 | 英特尔淘金物联网三板斧:芯片、边缘计算和计算机视觉
- 中国“新三板”——通过大数据实现股转系统监控
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
网站重构(第3版)
[美] Jeffrey Zeldman、[美] Ethan Marcotte / 傅捷、祝军、李宏 / 电子工业出版社 / 2011-3 / 59.00元
《网站重构:应用Web标准进行设计(第3版)》内容简介:畅销书作家、设计师、网页标准教父jeffrey zeldman再次更新了他经典的、颠覆行业的指南书。这已经是《网站重构:应用Web标准进行设计(第3版)》的第3版了,此次更新基本涵盖了随着环境和技术的变化,web标准所面临的挑战以及因此而发生的改善。第3版让基于标准的设计思想更加清晰,更加易于理解,帮助你在这个领域中保持聪明和领先。 ......一起来看看 《网站重构(第3版)》 这本书的介绍吧!