内容简介:github地址:tunny的项目结构非常简单,核心文件就是tunny.go与worker.go
tunny
github地址: https://github.com/Jeffail/tunny
项目结构
tunny的项目结构非常简单,核心文件就是tunny.go与worker.go
整体分析
tunny主要是通过reqChan管道来联系pool与worker之间的关系,worker的数量与协程池的大小相等,在初始化协程池时决定;各个worker竞争地获取reqChan中的数据,然后处理,最后返回给pool;
代码详解
type Pool struct { queuedJobs int64 ctor func() Worker workers []*workerWrapper reqChan chan workRequest workerMut sync.Mutex }
Pool结构体:
- queuedJobs,这个变量代表pool当前积压的job数量
- ctor,这个变量代表worker具体的构造函数
- workers,这个变量代表pool实际拥有的worker
- reqChan,这个变量是pool与所有worker进行通信的管道,所有worker与pool都使用相同的reqChan指针
- workerMut,这个变量是在pool进行SetSize操作时使用的,防止不同协程同时对size进行操作
type Worker interface { // Process will synchronously perform a job and return the result. Process(interface{}) interface{} // BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady() // Interrupt is called when a job is cancelled. The worker is responsible // for unblocking the Process implementation. Interrupt() // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }
worker在tunny中被设计成了一个interface,因为在之后的代码中可以看到,worker可以有许多不同地实现,正如之前一篇整理的博客所说: golang编码技巧总结 ,我们在写代码时都应该使用interface,来面向接口编程,实现解耦;
两种worker
// closureWorker is a minimal Worker implementation that simply wraps a // func(interface{}) interface{} type closureWorker struct { processor func(interface{}) interface{} }
闭包worker,这个worker是最常用的一种worker,它主要执行初始化时赋予它的processeor函数来完成工作;
type callbackWorker struct{} func (w *callbackWorker) Process(payload interface{}) interface{} { f, ok := payload.(func()) if !ok { return ErrJobNotFunc } f() return nil }
回调worker,这种worker处理的数据必须是一个函数,然后调用这个函数;
// NewFunc creates a new Pool of workers where each worker will process using // the provided func. func NewFunc(n int, f func(interface{}) interface{}) *Pool { return New(n, func() Worker { return &closureWorker{ processor: f, } }) }
初始化协程池时需要两个参数,一个是协程池大小n,一个是希望协程池执行的函数,这个函数最终交由闭包worker,运行时由它实际处理数据;
func New(n int, ctor func() Worker) *Pool { p := &Pool{ ctor: ctor, reqChan: make(chan workRequest), } p.SetSize(n) return p }
可以看到,reqChan在这时出现了,这个在之后的代码中将是连接pool与worker的核心;
SetSize会做什么呢?
func (p *Pool) SetSize(n int) { p.workerMut.Lock() defer p.workerMut.Unlock() lWorkers := len(p.workers) if lWorkers == n { return } // Add extra workers if N > len(workers) for i := lWorkers; i < n; i++ { p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor())) } // Asynchronously stop all workers > N for i := n; i < lWorkers; i++ { p.workers[i].stop() } // Synchronously wait for all workers > N to stop for i := n; i < lWorkers; i++ { p.workers[i].join() } // Remove stopped workers from slice p.workers = p.workers[:n] }
首先,会对这个函数加锁,这是为了防止在多个协程同时进行SetSize操作;
其次,当worker数量小于需要SetSize的数量,则增加worker的数量;
若worker数量大于SetSize的数量,则减小worker的数量;
增加worker的数量是如何增加呢? newWorkerWrapper
函数有很多值得关注的地方,值得注意的是,pool将它的reqChan传给了这个函数,也就是传给了worker;
func newWorkerWrapper( reqChan chan<- workRequest, worker Worker, ) *workerWrapper { w := workerWrapper{ worker: worker, interruptChan: make(chan struct{}), reqChan: reqChan, closeChan: make(chan struct{}), closedChan: make(chan struct{}), } go w.run() return &w }
可以看到,在调用初始化newWorkerWrapper后,go了一个协程,进行w.run()操作,worker在这里是调用的之前传入的闭包worker的构造函数生成的,因此这里的worker是闭包worker;
func (w *workerWrapper) run() { jobChan, retChan := make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }() for { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { case w.reqChan <- workRequest{ jobChan: jobChan, retChan: retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) } case <-w.closeChan: return } } }
解读这个run函数,这是整个worker的核心;
首先,能看到一个大的for循环,里面嵌套了select;
一进入select,会无脑往reqChan里传入workRequest,这时需要与pool的接收函数对应起来看:
func (p *Pool) Process(payload interface{}) interface{} { atomic.AddInt64(&p.queuedJobs, 1) request, open := <-p.reqChan if !open { panic(ErrPoolNotRunning) } request.jobChan <- payload payload, open = <-request.retChan if !open { panic(ErrWorkerClosed) } atomic.AddInt64(&p.queuedJobs, -1) return payload }
可以发现,因为worker会无脑往reqChan管道里传入workRequest,因此pool一定会取到塞入的值交给request变量,payload是实际处理的数据,pool将其塞入workRequest的jobChan中,之后阻塞等待从retChan取得结果,由于这个jobChan与worker的jobChan是同一个指针,因此payload能在worker的
select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } ...
case语句中被取到,然后进行处理,处理完后进入下一个select语句,无脑将result塞到retChan中;由于worker的retChan与pool的retChan是同一个指针,因此pool取到了retChan的结果,将其返回;
多个worker的情况,则会竞争从reqChan取数据,但是总能保证只有size个worker在工作,达到了限制协程数量的目的。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
高性能JavaScript
【美】Nicholas C. Zakas(尼古拉斯.泽卡斯) / 丁琛 / 电子工业出版社 / 2015-8-1 / 65
如果你使用 JavaScript 构建交互丰富的 Web 应用,那么 JavaScript 代码可能是造成你的Web应用速度变慢的主要原因。《高性能JavaScript》揭示的技术和策略能帮助你在开发过程中消除性能瓶颈。你将会了解如何提升各方面的性能,包括代码的加载、运行、DOM 交互、页面生存周期等。雅虎的前端工程师 Nicholas C. Zakas 和其他五位 JavaScript 专家介绍......一起来看看 《高性能JavaScript》 这本书的介绍吧!