内容简介:go程序开发过程中,通过简单的调用go func 函数来开启协程,容易导致程序死锁,并且会无限制的开启groutine,groutine数量激增的情况下并发性能会明显下降,所以需要考虑使用工作池来控制协程数量,以达到高并发的效果.直接上代码(JobPool.go)使用方法
go程序开发过程中,通过简单的调用go func 函数来开启协程,容易导致程序死锁,并且会无限制的开启groutine,groutine数量激增的情况下并发性能会明显下降,所以需要考虑使用工作池来控制协程数量,以达到高并发的效果.
直接上代码(JobPool.go)
package utils
import (
"fmt"
)
// 定义任务接口,所有实现该接口的均实现工作池
type Task interface {
DoTask() error
}
// 定义工作结构体
type Job struct {
Task Task
}
// 定义全部的工作队列
var JobQueue chan Job
// 定义工作者
type Worker struct {
WorkerPool chan chan Job // 工人对象池
JobChannel chan Job // 管道里面拿Job
quit chan bool
}
// 新建一个工作者
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool, // 工人对象池
JobChannel: make(chan Job), //工人的任务
quit: make(chan bool),
}
}
// 工作池启动主函数
func(w *Worker)Start(){
// 开一个新的协程
go func() {
for{
// 注册任务到工作池
w.WorkerPool <- w.JobChannel
select {
// 接收到任务
case job := <- w.JobChannel:
// 执行任务
err := job.Task.DoTask()
if err != nil {
fmt.Println("任务执行失败")
}
// 接收退出的任务, 停止任务
case <- w.quit:
return
}
}
}()
}
// 退出执行工作
func (w *Worker) Stop(){
go func(){
w.quit <- true
}()
}
// 定义任务发送者
type Sender struct {
maxWorkers int // 最大工人数
WorkerPool chan chan Job // 注册工作通道
quit chan bool // 退出信号
}
// 注册新发送者
func NewSender(maxWorkers int) *Sender{
Pool := make(chan chan Job, maxWorkers)
return &Sender{
WorkerPool: Pool, // 将工作者放到一个工作池中
maxWorkers: maxWorkers, // 最大工作者数量
quit: make(chan bool),
}
}
// 工作分发器
func(s *Sender)Run(){
for i:=0; i<s.maxWorkers; i++{
worker := NewWorker(s.WorkerPool)
// 执行任务
worker.Start()
}
// 监控任务发送
go s.Send()
}
// 退出发放工作
func (s *Sender) Quit(){
go func(){
s.quit <- true
}()
}
func(s *Sender)Send(){
for {
select {
// 接收到任务
case job :=<- JobQueue:
go func(job Job) {
jobChan := <- s.WorkerPool
jobChan <- job
}(job)
// 退出任务分发
case <- s.quit:
return
}
}
}
// 初始化对象池
func InitPool() {
maxWorkers := 4
maxQueue := 20
// 初始化一个任务发送者,指定工作者数量
send := NewSender(maxWorkers)
// 指定任务的队列长度
JobQueue = make(chan Job,maxQueue)
// 一直运行任务发送
send.Run()
}
使用方法
package main
import (
"fmt"
"os"
"test/utils" //引用: JobPool是放在test项目的utils包下
"time"
)
type Test struct {
num int
}
// 任务,实现JobPool的Task接口
func(t *Test)DoTask() error{
f, err := os.OpenFile("log.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777)
if err != nil {
return err
}
defer f.Close()
f.WriteString(fmt.Sprintf("这是任务:%d号,执行时间为:%s \n", t.num, fmt.Sprintf("%s", time.Now())))
return nil
}
func main(){
// 初始化对象池
utils.InitPool()
for i:=1;i<40 ;i++{
// 注册任务到Job队列中
task := &Test{i}
utils.JobQueue <- utils.Job{
Task:task,
}
}
// time.Sleep(180 * time.Second)
// 执行结束,关闭管道
close(utils.JobQueue)
}
参考文章: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Golang多协程并发工作池
- 并发题的解题思路以及 Go 语言调度器工作原理
- Java并发系列—并发编程基础
- [Java并发-17-并发设计模式] Immutability模式:如何利用不变性解决并发问题?
- JAVA并发编程之并发模拟工具
- Java并发系列—并发编程的挑战
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Design and Analysis of Distributed Algorithms (Wiley Series on P
Nicola Santoro / Wiley-Interscience / 2006-10-27 / USD 140.95
This text is based on a simple and fully reactive computational model that allows for intuitive comprehension and logical designs. The principles and techniques presented can be applied to any distrib......一起来看看 《Design and Analysis of Distributed Algorithms (Wiley Series on P》 这本书的介绍吧!