内容简介:要想写出高效简单的并发程序,还需要了解下常用的 goroutine和channel以哪种方式写,下面介绍3种常用的并发模式,写出更简化高效的并发。Runner 模式可以理解为执行者,也就是来控制程序的执行,它可以去执行任何程序,程序都是受监控的,可以去终止这些程序。当我们需要调度后台处理任务程序的时候,这种模式很拥有。简单说就是,控制 、执行,中断、退出。Runner 结构体很简单,主要包括 中断信号、完成信号、超时信号,以及要执行的任务队列。也就是说它是通过channel 通道的信息读取来控制程序的执行
并发模式
要想写出高效简单的并发程序,还需要了解下常用的 goroutine和channel以哪种方式写,下面介绍3种常用的并发模式,写出更简化高效的并发。
1、runner
Runner 模式可以理解为执行者,也就是来控制程序的执行,它可以去执行任何程序,程序都是受监控的,可以去终止这些程序。当我们需要调度后台处理任务程序的时候,这种模式很拥有。简单说就是,控制 、执行,中断、退出。
创建 Runner 结构体
/* runner 可以执行任何程序,可以监控程序,可以发送信号终止程序 */ type Runner struct { interrupt chan os.Signal //发送的信号,用来终止程序 complete chan error //用于通知任务全部完成 timeout <-chan time.Time //程序的超时时间 tasks []func(int) //要执行的任务 }
Runner 结构体很简单,主要包括 中断信号、完成信号、超时信号,以及要执行的任务队列。也就是说它是通过channel 通道的信息读取来控制程序的执行、中断和退出,它能实现
- 程序在分配的时间内完成工作,正常终止
- 程序没有及时完成巩固走,自动杀死
- 接收到操作系统发送的中断时间,程序立刻清理状态并停止
这三个channel 分别是 os.Signal 、error、time.Time 类型,这个中断信号类型是个os.Signal的通道,它是用来从操作系统中接收中断事件。 完成时信号是个error类型,也就是正常完成,它为nil,只要不为nil,就有错误发生呗。 timeout 只要能取出值,表示超时了,执行超时操作。
tasks就是一个简单的切片,类型是接收一个int型参数的函数类型。
创建简单工厂函数。
//工厂函数 func New(tm time.Duration) *Runner{ return &Runner{ complete:make(chan error),//无缓冲 timeout:time.After(tm), interrupt:make(chan os.Signal,1), //有缓冲 } }
这里关注两点, complete
是无缓冲通道, interrupt
是有缓冲通道。 因为, main.runtime
需要等 complete
,任务完成,程序退出,所以, complete
必须是同步的,无缓冲的。
而 interrupt
初始化缓存区容量为1,这样可以保证通道至少能接收一个来自运行时的os.Signal值,保证运行时发送这个事件的时候不会阻塞。
创建任务添加方法
//将需要执行的任务,添加到Runner里 func (r *Runner) Add(tasks ...func(int)){ r.tasks = append(r.tasks,tasks...) }
创建两个错误类型变量
var ErrTimeOut = errors.New("执行者执行超时") var ErrInterrupt = errors.New("执行者被中断")
这两个错误时接收到相应信号时返回的,一个是超时 一个是中断。
创建run方法
//执行任务,执行的过程中接收到中断信号时,返回中断错误 //如果任务全部执行完,还没有接收到中断信号,则返回nil func (r *Runner) run() error { for id, task := range r.tasks { if r.isInterrupt() { return ErrInterrupt } task(id)//执行任务 } return nil } //检查是否接收到了中断信号 func (r *Runner) isInterrupt() bool { select { case <-r.interrupt: signal.Stop(r.interrupt) return true default: return false } }
也就是,先判断有没有中断信号 ,能取出返回中断错误,没有,执行任务代码。这里判断是否接收中断信号 isInterrupt
使用select 语法, 有就执行case,没有就default。
创建start方法,监控任务,也就是run方法的包装
//开始执行所有任务,并且监视通道事件 func (r *Runner) Start() error { //希望接收哪些系统信号 signal.Notify(r.interrupt, os.Interrupt) go func() { r.complete <- r.run() }() select { case err := <-r.complete: return err case <-r.timeout: return ErrTimeOut } }
这个是runner 包最后一个方法了,它从操作系统接收中断信号,监控超时信号,一旦超时返回超时错误。一旦调用run方法过程中出现中断 信号,赋值给r.complete 终止信号。 只要此执行者Runner 出现中止、正常完成、超时,就会退出。
完成的runner 代码如下:
package runner import ( "errors" "os" "os/signal" "time" ) var ErrTimeOut = errors.New("执行者执行超时") var ErrInterrupt = errors.New("执行者被中断") /* runner 可以执行任何程序,可以监控程序,可以发送信号终止程序 */ type Runner struct { interrupt chan os.Signal //发送的信号,用来终止程序 complete chan error //用于通知任务全部完成 timeout <-chan time.Time //程序的超时时间 tasks []func(int) //要执行的任务 } //工厂函数 func New(tm time.Duration) *Runner { return &Runner{ complete: make(chan error), //无缓冲 timeout: time.After(tm), interrupt: make(chan os.Signal, 1), //有缓冲 } } //将需要执行的任务,添加到Runner里 func (r *Runner) Add(tasks ...func(int)) { r.tasks = append(r.tasks, tasks...) } //执行任务,执行的过程中接收到中断信号时,返回中断错误 //如果任务全部执行完,还没有接收到中断信号,则返回nil func (r *Runner) run() error { for id, task := range r.tasks { if r.isInterrupt() { return ErrInterrupt } task(id) } return nil } //检查是否接收到了中断信号 func (r *Runner) isInterrupt() bool { select { case <-r.interrupt: signal.Stop(r.interrupt) return true default: return false } } //开始执行所有任务,并且监视通道事件 func (r *Runner) Start() error { //希望接收哪些系统信号 signal.Notify(r.interrupt, os.Interrupt) go func() { r.complete <- r.run() }() select { case err := <-r.complete: return err case <-r.timeout: return ErrTimeOut } }
使用runner 管理工作:
package main import ( "a_tour_of_go/runner" //这个是写的runner所在包 "log" "os" "time" ) //必须在3内完成,超时时间 const timeout = 3 * time.Second func main() { log.Println("开始工作") r := runner.New(timeout) //初始化runner r.Add(createTask(), createTask(), createTask()) //添加任务 //执行任务并处理结果 if err := r.Start(); err != nil { switch err { case runner.ErrTimeOut: log.Println("超时错误") os.Exit(1) case runner.ErrInterrupt: log.Println("中断错误") os.Exit(2) } } log.Print("工作完成") } func createTask() func(int) { return func(id int) { log.Printf("正在执行任务#%d ", id) time.Sleep(time.Duration(id) * time.Second) } }
上面我们在main函数中初始化并调用我们的runner。 初始化超时时间3秒,添加了3个任务,每个任务打印一句话,睡眠了几秒。 然后调用 r.Start()
函数执行任务。 正常会返回nil,会打印工作完成。异常会打印我们的错误,调用os.Exit()以错误码退出。比如我们第3个任务就是睡眠3秒,这个是超时的,所以就会出现;
2019/06/24 09:58:39 开始工作 2019/06/24 09:58:39 正在执行任务#0 2019/06/24 09:58:39 正在执行任务#1 2019/06/24 09:58:40 正在执行任务#2 2019/06/24 09:58:42 超时错误
如果我们,使用命令go run main.go ,在执行过程中按下 Ctrl+C ,就会中断程序,如下:
2019/06/24 10:02:00 开始工作 2019/06/24 10:02:00 正在执行任务#0 2019/06/24 10:02:00 正在执行任务#1 2019/06/24 10:02:01 中断错误 exit status 2
现在我们改下超时时间为4,试试:
2019/06/24 10:03:16 开始工作 2019/06/24 10:03:16 正在执行任务#0 2019/06/24 10:03:16 正在执行任务#1 2019/06/24 10:03:17 正在执行任务#2 2019/06/24 10:03:19 工作完成
那就正常了。
2、pool
现在使用有缓冲的通道实现一个资源池,这个资源池可以管理在任意多个goroutine之间共享的资源,这种在需要共享一组静态资源的情况下非常有用,比如 网络连接 、 数据库连接 等,我们在数据库操作的时候,比较常见的就是数据连接池,也可以基于我们实现的资源池来实现。
可以看出,资源池也是一种非常流畅性的模式,这种模式一般适用于在多个goroutine之间共享资源,每个goroutine可以从资源池里申请资源,使用完之后再放回资源池里,以便其他goroutine复用。
创建资源池结构体
//一个安全的资源池,被管理的资源必须都实现io.Close接口 type Pool struct { m sync.Mutex res chan io.Closer factory func() (io.Closer,error) closed bool }
这个结构体 Pool
有四个字段,其中 m
是一个互斥锁,这主要是用来保证在多个goroutine访问资源时,池内的值是安全的。
res
字段是一个有缓冲的通道,用来保存共享的资源,这个通道的大小,在初始化 Pool
的时候就指定的。注意这个通道的类型是 io.Closer
接口,所以实现了这个 io.Closer
接口的类型都可以作为资源,交给我们的资源池管理。
factory
这个是一个函数类型,它的作用就是当需要一个新的资源时,可以通过这个函数创建,也就是说它是生成新资源的,至于如何生成、生成什么资源,是由使用者决定的,所以这也是这个资源池灵活的设计的地方。
closed
字段表示资源池是否被关闭,如果被关闭的话,再访问是会有错误的。
现在先这个资源池我们已经定义好了,也知道了每个字段的含义,下面就开时具体使用。刚刚我们说到关闭错误,那么我们就先定义一个资源池已经关闭的错误。
var ErrPoolClosed = errors.New("资源池已经关闭。")
非常简洁,当我们从资源池获取资源的时候,如果该资源池已经关闭,那么就会返回这个错误。单独定义它的目的,是和其他错误有一个区分,这样需要的时候,我们就可以从众多的 error
类型里区分出来这个 ErrPoolClosed
。
下面我们就该为创建 Pool
专门定一个函数了,这个函数就是工厂函数,我们命名为 New
。
创建工厂函数
//创建一个资源池 func New(fn func() (io.Closer, error), size uint) (*Pool, error) { if size <= 0 { return nil, errors.New("size的值太小了。") } return &Pool{ factory: fn, res: make(chan io.Closer, size), }, nil }
这个函数创建一个资源池,它接收两个参数,一个 fn
是创建新资源的函数;还有一个 size
是指定资源池的大小。
这个函数里,做了 size
大小的判断,起码它不能小于或者等于0,否则就会返回错误。如果参数正常,就会使用 size
创建一个有缓冲的通道,来保存资源,并且返回一个资源池的指针。
有了创建好的资源池,那么我们就可以从中获取资源了。
//从资源池里获取一个资源 func (p *Pool) Acquire() (io.Closer,error) { select { case r,ok := <-p.res: log.Println("Acquire:共享资源") if !ok { return nil,ErrPoolClosed } return r,nil default: log.Println("Acquire:新生成资源") return p.factory() } }
Acquire
方法可以从资源池获取资源,如果没有资源,则调用 factory
方法生成一个并返回。
这里同样使用了 select
的多路复用,因为这个函数不能阻塞,可以获取到就获取,不能就生成一个。
这里的新知识是通道接收的多参返回,如果可以接收的话,第一参数是接收的值,第二个表示通道是否关闭。例子中如果 ok
值为 false
表示通道关闭,如果为 true
则表示通道正常。所以我们这里做了一个判断,如果通道关闭的话,返回通道关闭错误。
有获取资源的方法,必然还有对应的释放资源的方法,因为资源用完之后,要还给资源池,以便复用。在讲解释放资源的方法前,我们先看下关闭资源池的方法,因为释放资源的方法也会用到它。
关闭资源池,意味着整个资源池不能再被使用,然后关闭存放资源的通道,同时释放通道里的资源。
资源池关闭函数
//关闭资源池,释放资源 func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true //关闭通道,不让写入了 close(p.res) //关闭通道里的资源 for r:=range p.res { r.Close() } }
这个方法里,我们使用了互斥锁,因为有个标记资源池是否关闭的字段 closed
需要再多个goroutine操作,所以我们必须保证这个字段的同步。这里把关闭标志置为 true
。
然后我们关闭通道,不让写入了,而且我们前面的 Acquire
也可以感知到通道已经关闭了。同比通道后,就开始释放通道中的资源,因为所有资源都实现了io.Closer接口,所以我们直接调用 Close
方法释放资源即可。
关闭方法有了,我们看看释放资源的方法如何实现。
资源释放函数
func (p *Pool) Release(r io.Closer){ //保证该操作和Close方法的操作是安全的 p.m.Lock() defer p.m.Unlock() //资源池都关闭了,就省这一个没有释放的资源了,释放即可 if p.closed { r.Close() return } select { case p.res <- r: log.Println("资源释放到池子里了") default: log.Println("资源池满了,释放这个资源吧") r.Close() } }
释放资源本质上就会把资源再发送到缓冲通道中,就是这么简单,不过为了更安全的实现这个方法,我们使用了互斥锁,保证 closed
标志的安全,而且这个互斥锁还有一个好处,就是不会往一个已经关闭的通道发送资源。
这是为什么呢?因为Close和Release这两个方法是互斥的,Close方法里对 closed
标志的修改,Release方法可以感知到,所以就直接return了,不会执行下面的select代码了,也就不会往一个已经关闭的通道里发送资源了。
如果资源池没有被关闭,则继续尝试往资源通道发送资源,如果可以发送,就等于资源又回到资源池里了;如果发送不了,说明资源池满了,该资源就无法重新回到资源池里,那么我们就把这个需要释放的资源关闭,抛弃了。
针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。
完整代码
package pool import ( "errors" "io" "sync" "log" ) //一个安全的资源池,被管理的资源必须都实现io.Close接口 type Pool struct { m sync.Mutex res chan io.Closer factory func() (io.Closer, error) closed bool } var ErrPoolClosed = errors.New("资源池已经被关闭。") //创建一个资源池 func New(fn func() (io.Closer, error), size uint) (*Pool, error) { if size <= 0 { return nil, errors.New("size的值太小了。") } return &Pool{ factory: fn, res: make(chan io.Closer, size), }, nil } //从资源池里获取一个资源 func (p *Pool) Acquire() (io.Closer,error) { select { case r,ok := <-p.res: log.Println("Acquire:共享资源") if !ok { return nil,ErrPoolClosed } return r,nil default: log.Println("Acquire:新生成资源") return p.factory() } } //关闭资源池,释放资源 func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true //关闭通道,不让写入了 close(p.res) //关闭通道里的资源 for r:=range p.res { r.Close() } } func (p *Pool) Release(r io.Closer){ //保证该操作和Close方法的操作是安全的 p.m.Lock() defer p.m.Unlock() //资源池都关闭了,就省这一个没有释放的资源了,释放即可 if p.closed { r.Close() return } select { case p.res <- r: log.Println("资源释放到池子里了") default: log.Println("资源池满了,释放这个资源吧") r.Close() } }
好了,资源池管理写好了,也知道资源池是如何实现的啦,现在我们看看如何使用这个资源池,模拟一个数据库连接池吧。
模拟数据库连接案例
package main import ( "flysnow.org/hello/common" "io" "log" "math/rand" "sync" "sync/atomic" "time" ) const ( //模拟的最大goroutine maxGoroutine = 5 //资源池的大小 poolRes = 2 ) func main() { //等待任务完成 var wg sync.WaitGroup wg.Add(maxGoroutine) p, err := common.New(createConnection, poolRes) if err != nil { log.Println(err) return } //模拟好几个goroutine同时使用资源池查询数据 for query := 0; query < maxGoroutine; query++ { go func(q int) { dbQuery(q, p) wg.Done() }(query) } wg.Wait() log.Println("开始关闭资源池") p.Close() } //模拟数据库查询 func dbQuery(query int, pool *common.Pool) { conn, err := pool.Acquire() if err != nil { log.Println(err) return } defer pool.Release(conn) //模拟查询 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID) } //数据库连接 type dbConnection struct { ID int32//连接的标志 } //实现io.Closer接口 func (db *dbConnection) Close() error { log.Println("关闭连接", db.ID) return nil } var idCounter int32 //生成数据库连接的方法,以供资源池使用 func createConnection() (io.Closer, error) { //并发安全,给数据库连接生成唯一标志 id := atomic.AddInt32(&idCounter, 1) return &dbConnection{id}, nil }
这时我们测试使用资源池的例子,首先定义了一个结构体 dbConnection
,它只有一个字段,用来做唯一标记。然后 dbConnection
实现了 io.Closer
接口,这样才可以使用我们的资源池。
createConnection
函数对应的是资源池中的 factory
字段,用来创建数据库连接 dbConnection
的,同时为其赋予了一个唯一的标志。
接着我们就同时开了5个goroutine,模拟并发的数据库查询 dbQuery
,查询方法里,先从资源池获取可用的数据库连接,用完后再释放。
这里我们会创建5个数据库连接,但是我们设置的资源池大小只有2,所以再释放了2个连接后,后面的3个连接会因为资源池满了而释放不了,一会我们看下输出的打印信息就可以看到。
最后这个资源连接池使用完之后,我们要关闭资源池,使用资源池的 Close
方法即可。
2019/06/24 10:51:08 Acquire:新生成资源 2019/06/24 10:51:08 Acquire:新生成资源 2019/06/24 10:51:08 Acquire:新生成资源 2019/06/24 10:51:08 Acquire:新生成资源 2019/06/24 10:51:08 Acquire:新生成资源 2019/06/24 10:51:08 第3个查询,使用的是ID为4的数据库连接 2019/06/24 10:51:08 资源释放到池子里了 2019/06/24 10:51:08 第2个查询,使用的是ID为5的数据库连接 2019/06/24 10:51:08 资源释放到池子里了 2019/06/24 10:51:08 第4个查询,使用的是ID为1的数据库连接 2019/06/24 10:51:08 资源池满了,释放这个资源吧 2019/06/24 10:51:08 关闭连接 1 2019/06/24 10:51:09 第1个查询,使用的是ID为3的数据库连接 2019/06/24 10:51:09 资源池满了,释放这个资源吧 2019/06/24 10:51:09 关闭连接 3 2019/06/24 10:51:09 第0个查询,使用的是ID为2的数据库连接 2019/06/24 10:51:09 资源池满了,释放这个资源吧 2019/06/24 10:51:09 关闭连接 2 2019/06/24 10:51:09 开始关闭资源池 2019/06/24 10:51:09 关闭连接 4 2019/06/24 10:51:09 关闭连接 5
可以看到,我们现在虽然看到了资源释放,但是没看到复用资源啊,因为我们执行goroutine太快了,在资源池还空的时候都生成了新的资源链接,现在我们修改下代码看:
//模拟数据库查询 func dbQuery(query int, pool *pool.Pool) { //由于并发太快了,我们现在让他们有些不同的延迟,错开来查看复用资源 time.Sleep(time.Duration(rand.Intn(5)) * time.Second) conn, err := pool.Acquire() if err != nil { log.Println(err) return } defer pool.Release(conn) //模拟查询 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID) }
我们添加一行代码,让所有的goroutine 睡眠时间不同,并不是上来就获取资源链接,查看结果
2019/06/24 11:27:55 Acquire:新生成资源 2019/06/24 11:27:55 Acquire:新生成资源 2019/06/24 11:27:56 第0个查询,使用的是ID为1的数据库连接 2019/06/24 11:27:56 资源释放到池子里了 2019/06/24 11:27:56 第4个查询,使用的是ID为2的数据库连接 2019/06/24 11:27:56 资源释放到池子里了 2019/06/24 11:27:56 Acquire:共享资源 2019/06/24 11:27:56 Acquire:共享资源 2019/06/24 11:27:57 第2个查询,使用的是ID为2的数据库连接 2019/06/24 11:27:57 资源释放到池子里了 2019/06/24 11:27:57 第1个查询,使用的是ID为1的数据库连接 2019/06/24 11:27:57 资源释放到池子里了 2019/06/24 11:27:58 Acquire:共享资源 2019/06/24 11:27:59 第3个查询,使用的是ID为2的数据库连接 2019/06/24 11:27:59 资源释放到池子里了 2019/06/24 11:27:59 开始关闭资源池 2019/06/24 11:27:59 关闭连接 1 2019/06/24 11:27:59 关闭连接 2
可以看到,我们只生成了2个自选,共享了3次,执行了5个goroutine。
到这里,我们已经完成了一个资源池的管理,并且进行了使用测试。 资源对象池的使用比较频繁,因为我们想把一些对象缓存起来,以便使用,这样就会比较高效,而且不会经常调用GC。
为此 Go 1.6开始为我们提供了原生的资源池管理,防止我们重复造轮子,这就是 sync.Pool
,我们看下刚刚我们的例子,如果用 sync.Pool
实现。
sync.Pool
package main import ( "log" "math/rand" "sync" "sync/atomic" "time" ) const ( //模拟的最大goroutine maxGoroutine = 5 ) func main() { //等待任务完成 var wg sync.WaitGroup wg.Add(maxGoroutine) p:=&sync.Pool{ New:createConnection, } //模拟好几个goroutine同时使用资源池查询数据 for query := 0; query < maxGoroutine; query++ { go func(q int) { dbQuery(q, p) wg.Done() }(query) } wg.Wait() } //模拟数据库查询 func dbQuery(query int, pool *sync.Pool) { conn:=pool.Get().(*dbConnection) defer pool.Put(conn) //模拟查询 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID) } //数据库连接 type dbConnection struct { ID int32//连接的标志 } //实现io.Closer接口 func (db *dbConnection) Close() error { log.Println("关闭连接", db.ID) return nil } var idCounter int32 //生成数据库连接的方法,以供资源池使用 func createConnection() interface{} { //并发安全,给数据库连接生成唯一标志 id := atomic.AddInt32(&idCounter, 1) return &dbConnection{ID:id} }
进行微小的改变即可,因为系统库没有提供 New
这类的工厂函数,所以我们使用字面量创建了一个 sync.Pool
,注意里面的 New
字段,这是一个返回任意对象的方法,类似我们自己实现的资源池中的 factory
字段,意思都是一样的,都是当没有可用资源的时候,生成一个。
这里我们留意到系统的资源池是没有大小限制的,也就是说默认情况下是无上限的,受内存大小限制。
资源的获取和释放对应的方法是 Get
和 Put
,也很简洁,返回任意对象 interface{}
。 conn:=pool.Get().(*dbConnection)
这行代码用类型断言的方式,如果类型是 *dbConnection
,就返回该类型,不是的话返回nil,这样写简介,不用写类型转换。
2019/06/24 11:49:30 第2个查询,使用的是ID为4的数据库连接 2019/06/24 11:49:30 第0个查询,使用的是ID为1的数据库连接 2019/06/24 11:49:30 第3个查询,使用的是ID为5的数据库连接 2019/06/24 11:49:31 第1个查询,使用的是ID为2的数据库连接 2019/06/24 11:49:31 第4个查询,使用的是ID为3的数据库连接
关于系统的资源池,我们需要注意的是它缓存的对象都是临时的,也就说下一次GC的时候,这些存放的对象都会被清除掉。
3、 work
work 模式的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行
并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大
小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执
行。 无缓冲的通道保证两个 goroutine 之间的数据交换 。这种使用无缓冲的通道的方法允许使用
者知道什么时候 goroutine 池正在执行工作,而且如果池里的所有 goroutine 都忙,无法接受新的
工作的时候,也能及时通过通道来通知调用者。使用无缓冲的通道不会有工作在队列里丢失或者
卡住,所有工作都会被处理 。
直接上代码:
package work import "sync" //必须满足 Worker类型才能使用工作池 type Worker interface { Task() } //goroutine 池,完成任何提交给worker 的工作 type Pool struct { work chan Worker wg sync.WaitGroup } //创建一个新的工作池 func New(maxGoroutines int) *Pool{ p:=Pool{ work:make(chan Worker), } p.wg.Add(maxGoroutines) //添加多个goroutine for i:=0;i<maxGoroutines;i++{ //开启多个gouroutine go func() { for w:=range p.work{ //阻塞、等待这channel 的值传来,只要我不关闭,就等着 w.Task() //执行工作代码 } p.wg.Done() }() } return &p } //Run 提交工作到工作池 func (p *Pool)Run(w Worker){ p.work<-w //传一个 Worker 类型的值给通道 } //Shutdown 等待所有goroutine 停止工作 func (p *Pool) Shutdown(){ close(p.work) //关闭通道,那么等待的通道就会关闭,for range 下面的代码就执行,p.wg.Done() p.wg.Wait() //maxGoroutines 几个,就等待几个goroutine关闭。 }
work 模式其实就是通过构建多个goroutine,通过 for range
阻塞等待的方式,事先准备好执行工作的代码。并且使用的是无缓冲通道,,使用的时候直接往通道传值,准备好的通道立马接收到值,马上执行工作代码。说到底其实就是 我一个工作,事先声明了多个 goroutine 等着做,只要我不关闭channel,就一直等着,来一个我做一个,我是并发工作的。
整体结构如上面代码:
首先,我是一个工作管理者,我要定义我接收的类型,定义一个 Worker接口,Task()方法。
然后,我要去多线程的去执行工作,创建一个 goroutine池 结构,结构体成员一个是 channel,一个是WaitGroup。我是通过channel通信来管理工作执行,waitgroup保证每一个都能执行完。
再然后,我需要一个工厂函数,创建我这个工作管理者的 实例。 思路如下:初始化的时候由用户告诉我需要开启多少个goroutine来并发的执行任务, 每一个goroutine都是 在 p.wg.Add()
到 p.wg.Done()
直接活着。比如我们传入3 开启了3个goroutine,这三个goroutine都是使用 for range
语法在等待着 p.work
这个channel 有没有值,一直在等啊等,下面代码就不执行了,阻塞中。 这就是工作原理,只要有人一传值,我立马就能工作,只要这个channel没有关闭,还会一直等。
再然后就是,我需要给别人提供添加任务(也就是发送信息)的入口,Run() 方法,直接往p.work中传入Worker类型的值即可。
最后,所有工作完成后是要收尾的 Shutdown()
,首先需要关闭 p.work
这个通道,只有关闭了通道,那么开启的3个goroutine就会不再阻塞,就会执行后面的 p.wg.Done()
方法,然后还有一行代码是 p.wg.Wait()
这个就是确保等待着3个goroutine都关闭即他们都执行 p.wg.Done()
这行代码,避免有些没有done,主goroutine就退出了。
1)
测试代码:
package main import ( "a_tour_of_go/work" "log" "sync" "time" ) //测试一组名字 var names=[]string{ "张飞", "吕布", "关羽", "赵云", "二蛋", } //定义个结构体,并实现Worker接口 type namePrinter struct { name string } //实现Task ,即实现了Worker接口 func (m *namePrinter) Task(){ log.Println(m.name) time.Sleep(time.Second) } func main(){ //使用3个goroutine的工作池 p:=work.New(3) var wg sync.WaitGroup wg.Add(100*len(names)) //即每打算每个names的成员都开100goroutine for i:=0;i<100 ;i++ { for _,name:=range names{ np:=namePrinter{ name:name, } go func() { p.Run(&np) //提交工作 wg.Done() //结束提交工作的goroutine }() } } wg.Wait() //等着所有提交goroutine 执行完毕 p.Shutdown()// 关闭工作池 }
测试代码的思路是: 每一个名字,开启了100个goroutine去打印, 一共有500个goroutine。这些goroutine不是用来执行工作的,他们其实都只是提交工作命令的,也就是传递信息的。每一个goroutine往channel 中传个值就完成,开销又不大。 因为我们的工作池上面准备好工作了,就等着命令了,这边信息只要已传递,工作代码就执行,并且是开启了3个goroutine等待着,一次性提交500个命令,那么就会3个一组 3个一组的打印,并发执行。等到所有提交goroutine都执行完成了,wg.Wait()的使命就结束了,接下来就是要关闭工作池了。Shutdown方法会直到所有的工作都昨晚才返回,因为所有的提交命令 传递的同时,那边工作的goroutine已经在执行工作代码了( w.Task()
),只要 工作代码( w.Task()
)不执行完,下面 p.wg.Done()
关闭工作goroutine的代码就不会执行。
这个例子最多会等待3个工作完成,因为只开启了3个工作goroutine,一旦都Done()了,那就主函数退出了。
参考资料
- go 语言实战
- 飞雪无情的博客
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Go 语言并发编程(一):并发的演化历史
- go语言大并发(一)----goroutine与并发模型
- go语言并发编程
- go语言-常见并发模式
- Go 语言的并发实战
- 区块链技术语言(二十七):Go语言并发编程(上)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web2.0策略指南
艾米 / 2009-3 / 39.00元
《Web2.0策略指南》是有关战略的。书中的示例关注的是Web 2.0的效率,而不是聚焦于技术。你将了解到这样一个事实:创建Web 210业务或将Web 2.0战略整合到现在业务中,意味着创建一个吸引人们前来访问的在线站点,让人们愿意到这里来共享他们的思想、见闻和行动。当人们通过Web走到一起时,结果可能远远大于各部分的和。随着传统的“口碑传诵”助推站点高速成长,客户本身就能够帮助建立站点。 ......一起来看看 《Web2.0策略指南》 这本书的介绍吧!