golang源码阅读---tunny协程池的基本实现原理

栏目: Go · 发布时间: 6年前

内容简介:前段时间因为一个爬虫项目,最开始做的时候是无脑的一个下载任务就开一个协程,但是后期出现了比较大的内存问题,并且下载的效果也不是那么的好,后面发现是因为协程开的太多了,并且下行的带宽就只有那么的大,所以并不能和想象中的那样。哎,还是因为too young,too simple,sometimes naive.因为本人是小菜,加上时间仓促,所以要是有什么问题的话希望大佬指正。tunny地址:

前段时间因为一个爬虫项目,最开始做的时候是无脑的一个下载任务就开一个协程,但是后期出现了比较大的内存问题,并且下载的效果也不是那么的好,后面发现是因为协程开的太多了,并且下行的带宽就只有那么的大,所以并不能和想象中的那样。哎,还是因为too young,too simple,sometimes naive.

这篇主要是讲解的tunny是如何是如何实现并保持一个goroutine pool的。

因为本人是小菜,加上时间仓促,所以要是有什么问题的话希望大佬指正。

1.简介

tunny地址: https://github.com/Jeffail/tunny

这是一个goroutine pool包,可以设置或者动态改变goroutine pool中goroutine的数量,生成一个固定的数量的pool,实现goroutine的重复使用,并且能在一定程度上控制goroutine

2.源码

1.基本的数据类型

通过tunny的源码包文件数量并不多,只有3个文件,tonny.go和worker.go,没有那么多的文件层次结构,所以阅读起来特别的方便。这也是我比较喜欢阅读 go 语言代码的原因。

tunny.go中

Pool结构

主要是用于对整个pool的管理,其中包括pool

type Pool struct {
    ctor    func() Worker //goroutine中用户的业务逻辑函数
    workers []*workerWrapper //目前已经存在的goroutine信息,workerWrapper结构定义在worker.go的中,
    reqChan chan workRequest //任务调度管道,主要是用户管理当前goroutine是否执行任务,它和workerWrapper中的reqChan 其实是一个,但是workerWrapper的reqChan只是一个发送管道,这个后面会继续讲解

    workerMut  sync.Mutex //锁
    queuedJobs int64 计数,表示当前已经在运行的任务
}

worker接口主要用户包装用户的业务逻辑的func

type Worker interface {
    // Process will synchronously perform a job and return the result.
    //
    Process(interface{}) interface{}

    // BlockUntilReady is called before each job is processed and must block the
    // calling goroutine until the Worker is ready to process the next job.
    BlockUntilReady()

    // Interrupt is called when a job is cancelled. The worker is responsible
    // for unblocking the Process implementation.
    Interrupt()

    // Terminate is called when a Worker is removed from the processing pool
    // and is responsible for cleaning up any held resources.
    Terminate()
}

closureWorker 顾明思议,主要是用于包装用户的业务逻辑,

并且是Worker的完全接收者

type closureWorker struct {
    processor func(interface{}) interface{}
}

在worker.go中

type workerWrapper struct {
    worker        Worker  //用户存放用户定义的业务逻辑函数
    interruptChan chan struct{} //用于外部干预,使当前goroutine提前终止

    // reqChan is NOT owned by this type, it is used to send requests for work.
    reqChan chan<- workRequest //这个和pool.go中Pool类型中的reqChan是一个,只不过当前这个是一个发送管道

    // closeChan can be closed in order to cleanly shutdown this worker.
    closeChan chan struct{}  //这个是用于传递关闭当前goroutine的消息

    // closedChan is closed by the run() goroutine when it exits.
    closedChan chan struct{} //这个我感觉并没有太大的实际意义
}

这个主要是用于传递任务参数。以及返回任务执行结果的类型

type workRequest struct {
    // jobChan is used to send the payload to this worker.
    jobChan chan<- interface{}

    // retChan is used to read the result from this worker.
    retChan <-chan interface{}

    // interruptFunc can be called to cancel a running job. When called it is no
    // longer necessary to read from retChan.
    interruptFunc func()
}

2.如何创建一个goroutine pool

根据代码的调用步骤,

首先是实例化一个Pool类型的数据,并将用户用户的业务func包装成closureWorker类型并存储在Pool类型实例中的ctor字段中

使用外部调用创建一个Pool对象:

golang源码阅读---tunny协程池的基本实现原理

包中创建一个Pool的逻辑

golang源码阅读---tunny协程池的基本实现原理

golang源码阅读---tunny协程池的基本实现原理

逻辑很简单,一眼就能看明白。

那么在哪里启动一个goroutine,请看下面

golang源码阅读---tunny协程池的基本实现原理

注意这里的参数传递,这里传递了一个channel类型的参数,众所周知,在go中,分为两种类型,一种是值类型,一种是引用类型(map,slice,channel),说这么多有什么用呢,怎么扯到引用类型上面去了呢,但这个很重要

我们接下我们看在newWorkerWrapper中的逻辑

golang源码阅读---tunny协程池的基本实现原理

上面说到,我们传递过去了两个参数,其中一个是一个channel类型的, 因为channel引用类型,所以他的传递是地址,所以在最后newWorkerWrapper中赋值的时候workerWrapper.reqChan和pool.reqChan实际指向的是同一个地址 ,区别就是workerWrapper.reqChan是一个发送管道罢了

我们可以输出看看

golang源码阅读---tunny协程池的基本实现原理

下面是run函数中的代码

golang源码阅读---tunny协程池的基本实现原理

run函数中的代码算是是整个包中最重要的代码了。

他的实现原理是比较简单的,就是采用的是一个for+select+channel来实现的,并且select采用是嵌套的形式,但是其中还是有些比较难以理解的 (当然对我小白我来说哈,2333333)

我感觉主要是这两段

golang源码阅读---tunny协程池的基本实现原理

这两段的代码,需要结合到下一个小姐来说,请看下一个。

2.调用goroutine pool

这里调用很简单,只需要ret := pool.Process(参数)就ok了

我们来看看Process中是怎么样的

golang源码阅读---tunny协程池的基本实现原理

Process中逻辑很简单,上一个小姐我们知道, pool的reqChan 和 pool.workers.reqChan 是指向的同一个地址,但是后者为一个发送管道 所以,在这样来使用时安全的,数据是不会错误的 。

在前面我的run函数中,有两段代码还没说明意思,现在我就说明一下, 第一个就是这段,

golang源码阅读---tunny协程池的基本实现原理

(1)在我们定义reqChan管道的时候,我们定义的是一个 没有缓冲区的管道 ,所以在没有接受操作的情况下,我们向管道里面推送数据是会被 阻塞 住的。

(2)在go中select是在有IO操作的情况下会被触发,所以要是我们没有在Process函数中调用reqChan接收数据, 当前goroutine是会被阻塞住的 这样当前select内层的select也会被阻塞住。

golang源码阅读---tunny协程池的基本实现原理

然后我们在来看通过reqChan传递过来的值

golang源码阅读---tunny协程池的基本实现原理

上面讲到,channel是引用类型,所以它在传递的时候是传递的地址,而不是值,所以,我们接收到的 jobChan和retChan和传递过来指向的是同样的地址 ,这样我们就能实现共享通信了。我们可以输出里面两边的地址看看,这里我开了一个容量为2的pool,然后我调用pool里面的其中一个goroutine,我们看打印的地址

golang源码阅读---tunny协程池的基本实现原理

看。。。。没错吧。。。。。

3.Extra

有一个问题,就是当我们的pool有2个goroutinr的时候,但是我们有200个任务需要完成,也就是需要调用200测goroutine,Tunny是怎么样实现调度的呢,这个后面的文章补充吧,下班。。。。。。。。

golang源码阅读---tunny协程池的基本实现原理

看,就是这样的嘛。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Spark

Spark

Bill Chambers、Matei Zaharia / O′Reilly / 2017-10-31 / GBP 39.99

Learn how to use, deploy, and maintain Apache Spark with this comprehensive guide, written by the creators of the open-source cluster-computing framework. With an emphasis on improvements and new feat......一起来看看 《Spark》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具