内容简介:go程序开发过程中,通过简单的调用go func 函数来开启协程,容易导致程序死锁,并且会无限制的开启groutine,groutine数量激增的情况下并发性能会明显下降,所以需要考虑使用工作池来控制协程数量,以达到高并发的效果.直接上代码(JobPool.go)使用方法
go程序开发过程中,通过简单的调用go func 函数来开启协程,容易导致程序死锁,并且会无限制的开启groutine,groutine数量激增的情况下并发性能会明显下降,所以需要考虑使用工作池来控制协程数量,以达到高并发的效果.
直接上代码(JobPool.go)
package utils import ( "fmt" ) // 定义任务接口,所有实现该接口的均实现工作池 type Task interface { DoTask() error } // 定义工作结构体 type Job struct { Task Task } // 定义全部的工作队列 var JobQueue chan Job // 定义工作者 type Worker struct { WorkerPool chan chan Job // 工人对象池 JobChannel chan Job // 管道里面拿Job quit chan bool } // 新建一个工作者 func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, // 工人对象池 JobChannel: make(chan Job), //工人的任务 quit: make(chan bool), } } // 工作池启动主函数 func(w *Worker)Start(){ // 开一个新的协程 go func() { for{ // 注册任务到工作池 w.WorkerPool <- w.JobChannel select { // 接收到任务 case job := <- w.JobChannel: // 执行任务 err := job.Task.DoTask() if err != nil { fmt.Println("任务执行失败") } // 接收退出的任务, 停止任务 case <- w.quit: return } } }() } // 退出执行工作 func (w *Worker) Stop(){ go func(){ w.quit <- true }() } // 定义任务发送者 type Sender struct { maxWorkers int // 最大工人数 WorkerPool chan chan Job // 注册工作通道 quit chan bool // 退出信号 } // 注册新发送者 func NewSender(maxWorkers int) *Sender{ Pool := make(chan chan Job, maxWorkers) return &Sender{ WorkerPool: Pool, // 将工作者放到一个工作池中 maxWorkers: maxWorkers, // 最大工作者数量 quit: make(chan bool), } } // 工作分发器 func(s *Sender)Run(){ for i:=0; i<s.maxWorkers; i++{ worker := NewWorker(s.WorkerPool) // 执行任务 worker.Start() } // 监控任务发送 go s.Send() } // 退出发放工作 func (s *Sender) Quit(){ go func(){ s.quit <- true }() } func(s *Sender)Send(){ for { select { // 接收到任务 case job :=<- JobQueue: go func(job Job) { jobChan := <- s.WorkerPool jobChan <- job }(job) // 退出任务分发 case <- s.quit: return } } } // 初始化对象池 func InitPool() { maxWorkers := 4 maxQueue := 20 // 初始化一个任务发送者,指定工作者数量 send := NewSender(maxWorkers) // 指定任务的队列长度 JobQueue = make(chan Job,maxQueue) // 一直运行任务发送 send.Run() }
使用方法
package main import ( "fmt" "os" "test/utils" //引用: JobPool是放在test项目的utils包下 "time" ) type Test struct { num int } // 任务,实现JobPool的Task接口 func(t *Test)DoTask() error{ f, err := os.OpenFile("log.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777) if err != nil { return err } defer f.Close() f.WriteString(fmt.Sprintf("这是任务:%d号,执行时间为:%s \n", t.num, fmt.Sprintf("%s", time.Now()))) return nil } func main(){ // 初始化对象池 utils.InitPool() for i:=1;i<40 ;i++{ // 注册任务到Job队列中 task := &Test{i} utils.JobQueue <- utils.Job{ Task:task, } } // time.Sleep(180 * time.Second) // 执行结束,关闭管道 close(utils.JobQueue) }
参考文章: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Golang多协程并发工作池
- 并发题的解题思路以及 Go 语言调度器工作原理
- Java并发系列—并发编程基础
- [Java并发-17-并发设计模式] Immutability模式:如何利用不变性解决并发问题?
- JAVA并发编程之并发模拟工具
- Java并发系列—并发编程的挑战
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
HTML 压缩/解压工具
在线压缩/解压 HTML 代码
HTML 编码/解码
HTML 编码/解码