内容简介:前段时间因为一个爬虫项目,最开始做的时候是无脑的一个下载任务就开一个协程,但是后期出现了比较大的内存问题,并且下载的效果也不是那么的好,后面发现是因为协程开的太多了,并且下行的带宽就只有那么的大,所以并不能和想象中的那样。哎,还是因为too young,too simple,sometimes naive.因为本人是小菜,加上时间仓促,所以要是有什么问题的话希望大佬指正。tunny地址:
前段时间因为一个爬虫项目,最开始做的时候是无脑的一个下载任务就开一个协程,但是后期出现了比较大的内存问题,并且下载的效果也不是那么的好,后面发现是因为协程开的太多了,并且下行的带宽就只有那么的大,所以并不能和想象中的那样。哎,还是因为too young,too simple,sometimes naive.
这篇主要是讲解的tunny是如何是如何实现并保持一个goroutine pool的。
因为本人是小菜,加上时间仓促,所以要是有什么问题的话希望大佬指正。
1.简介
tunny地址: https://github.com/Jeffail/tunny
这是一个goroutine pool包,可以设置或者动态改变goroutine pool中goroutine的数量,生成一个固定的数量的pool,实现goroutine的重复使用,并且能在一定程度上控制goroutine
2.源码
1.基本的数据类型
通过tunny的源码包文件数量并不多,只有3个文件,tonny.go和worker.go,没有那么多的文件层次结构,所以阅读起来特别的方便。这也是我比较喜欢阅读 go 语言代码的原因。
tunny.go中
Pool结构
主要是用于对整个pool的管理,其中包括pool
type Pool struct {
ctor func() Worker //goroutine中用户的业务逻辑函数
workers []*workerWrapper //目前已经存在的goroutine信息,workerWrapper结构定义在worker.go的中,
reqChan chan workRequest //任务调度管道,主要是用户管理当前goroutine是否执行任务,它和workerWrapper中的reqChan 其实是一个,但是workerWrapper的reqChan只是一个发送管道,这个后面会继续讲解
workerMut sync.Mutex //锁
queuedJobs int64 计数,表示当前已经在运行的任务
}
worker接口主要用户包装用户的业务逻辑的func
type Worker interface {
// Process will synchronously perform a job and return the result.
//
Process(interface{}) interface{}
// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()
// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}
closureWorker 顾明思议,主要是用于包装用户的业务逻辑,
并且是Worker的完全接收者
type closureWorker struct {
processor func(interface{}) interface{}
}
在worker.go中
type workerWrapper struct {
worker Worker //用户存放用户定义的业务逻辑函数
interruptChan chan struct{} //用于外部干预,使当前goroutine提前终止
// reqChan is NOT owned by this type, it is used to send requests for work.
reqChan chan<- workRequest //这个和pool.go中Pool类型中的reqChan是一个,只不过当前这个是一个发送管道
// closeChan can be closed in order to cleanly shutdown this worker.
closeChan chan struct{} //这个是用于传递关闭当前goroutine的消息
// closedChan is closed by the run() goroutine when it exits.
closedChan chan struct{} //这个我感觉并没有太大的实际意义
}
这个主要是用于传递任务参数。以及返回任务执行结果的类型
type workRequest struct {
// jobChan is used to send the payload to this worker.
jobChan chan<- interface{}
// retChan is used to read the result from this worker.
retChan <-chan interface{}
// interruptFunc can be called to cancel a running job. When called it is no
// longer necessary to read from retChan.
interruptFunc func()
}
2.如何创建一个goroutine pool
根据代码的调用步骤,
首先是实例化一个Pool类型的数据,并将用户用户的业务func包装成closureWorker类型并存储在Pool类型实例中的ctor字段中
使用外部调用创建一个Pool对象:
包中创建一个Pool的逻辑
逻辑很简单,一眼就能看明白。
那么在哪里启动一个goroutine,请看下面
注意这里的参数传递,这里传递了一个channel类型的参数,众所周知,在go中,分为两种类型,一种是值类型,一种是引用类型(map,slice,channel),说这么多有什么用呢,怎么扯到引用类型上面去了呢,但这个很重要
我们接下我们看在newWorkerWrapper中的逻辑
上面说到,我们传递过去了两个参数,其中一个是一个channel类型的, 因为channel引用类型,所以他的传递是地址,所以在最后newWorkerWrapper中赋值的时候workerWrapper.reqChan和pool.reqChan实际指向的是同一个地址 ,区别就是workerWrapper.reqChan是一个发送管道罢了
我们可以输出看看
下面是run函数中的代码
run函数中的代码算是是整个包中最重要的代码了。
他的实现原理是比较简单的,就是采用的是一个for+select+channel来实现的,并且select采用是嵌套的形式,但是其中还是有些比较难以理解的 (当然对我小白我来说哈,2333333)
我感觉主要是这两段
这两段的代码,需要结合到下一个小姐来说,请看下一个。
2.调用goroutine pool
这里调用很简单,只需要ret := pool.Process(参数)就ok了
我们来看看Process中是怎么样的
Process中逻辑很简单,上一个小姐我们知道, pool的reqChan 和 pool.workers.reqChan 是指向的同一个地址,但是后者为一个发送管道 所以,在这样来使用时安全的,数据是不会错误的 。
在前面我的run函数中,有两段代码还没说明意思,现在我就说明一下, 第一个就是这段,
(1)在我们定义reqChan管道的时候,我们定义的是一个 没有缓冲区的管道 ,所以在没有接受操作的情况下,我们向管道里面推送数据是会被 阻塞 住的。
(2)在go中select是在有IO操作的情况下会被触发,所以要是我们没有在Process函数中调用reqChan接收数据, 当前goroutine是会被阻塞住的 这样当前select内层的select也会被阻塞住。
然后我们在来看通过reqChan传递过来的值
上面讲到,channel是引用类型,所以它在传递的时候是传递的地址,而不是值,所以,我们接收到的 jobChan和retChan和传递过来指向的是同样的地址 ,这样我们就能实现共享通信了。我们可以输出里面两边的地址看看,这里我开了一个容量为2的pool,然后我调用pool里面的其中一个goroutine,我们看打印的地址
看。。。。没错吧。。。。。
3.Extra
有一个问题,就是当我们的pool有2个goroutinr的时候,但是我们有200个任务需要完成,也就是需要调用200测goroutine,Tunny是怎么样实现调度的呢,这个后面的文章补充吧,下班。。。。。。。。
看,就是这样的嘛。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Introduction to Programming in Java
Robert Sedgewick、Kevin Wayne / Addison-Wesley / 2007-7-27 / USD 89.00
By emphasizing the application of computer programming not only in success stories in the software industry but also in familiar scenarios in physical and biological science, engineering, and appli......一起来看看 《Introduction to Programming in Java》 这本书的介绍吧!