Colly源码解析——结合例子分析底层实现

栏目: Go · 发布时间: 5年前

内容简介:以下例子截取于c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测

递归深度

以下例子截取于 Basic

c := colly.NewCollector(
		// Visit only domains: hackerspaces.org, wiki.hackerspaces.org
		colly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),
	)

	// On every a element which has href attribute call callback
	c.OnHTML("a[href]", func(e *colly.HTMLElement) {
		link := e.Attr("href")
		// Print link
		fmt.Printf("Link found: %q -> %s\n", e.Text, link)
		// Visit link found on page
		// Only those links are visited which are in AllowedDomains
		c.Visit(e.Request.AbsoluteURL(link))
	})

c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。

func (c *Collector) Visit(URL string) error {
	return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}

由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测

// requestCheck method
	if c.MaxDepth > 0 && c.MaxDepth < depth {
		return ErrMaxDepth
	}

如果希望通过MaxDepth控制深度,则可以参见 Max depth 例子

c := colly.NewCollector(
		// MaxDepth is 1, so only the links on the scraped page
		// is visited, and no further links are followed
		colly.MaxDepth(1),
	)

	// On every a element which has href attribute call callback
	c.OnHTML("a[href]", func(e *colly.HTMLElement) {
		link := e.Attr("href")
		// Print link
		fmt.Println(link)
		// Visit link found on page
		e.Request.Visit(link)
	})

第4行将深度设置为1,这样理论上只能访问第一层的URL。

如果OnHTML中的代码和Basic例子一样,即使用Collector的Visit访问URL,则由于其depth一直传1,而导致requestCheck的深度检测一直不满足条件,从而会访问超过1层的URL。

所以第13行,调用的是HTMLElement的Visit方法

func (r *Request) Visit(URL string) error {
	return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}

相较于Collector的Visit,HTMLElement的Visit方法将Depth增加了1,并且传递了请求的上下文(ctx)。由于depth有变化,所以之后的深度检测会返回错误,从而只会访问1层URL。

规则

Collector的Limit方法用于设置各种规则。这些规则最终在Collector的httpBackend成员中执行。

一个Collector只有一个httpBackend结构体指针,而一个httpBackend结构体可以有一组规则

type httpBackend struct {
	LimitRules []*LimitRule
	Client     *http.Client
	lock       *sync.RWMutex
}

规则针对Domain来区分,我们可以通过设定不同的匹配规则,让每组URL执行相应的操作。这些操作包括:

  • 访问并行数
  • 访问间隔延迟

参见 Parallel 例子。只截取其中关键一段

// Limit the maximum parallelism to 2
	// This is necessary if the goroutines are dynamically
	// created to control the limit of simultaneous requests.
	//
	// Parallelism can be controlled also by spawning fixed
	// number of go routines.
	c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})

Collector的Limit最终会调用到httpBackend的Limit,它将规则加入到规则组后初始化该规则。

// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {
	waitChanSize := 1
	if r.Parallelism > 1 {
		waitChanSize = r.Parallelism
	}
	r.waitChan = make(chan bool, waitChanSize)
	hasPattern := false
	if r.DomainRegexp != "" {
		c, err := regexp.Compile(r.DomainRegexp)
		if err != nil {
			return err
		}
		r.compiledRegexp = c
		hasPattern = true
	}
	if r.DomainGlob != "" {
		c, err := glob.Compile(r.DomainGlob)
		if err != nil {
			return err
		}
		r.compiledGlob = c
		hasPattern = true
	}
	if !hasPattern {
		return ErrNoPattern
	}
	return nil
}

第7行创建了一个可以承载waitChanSize个元素的channel。可以看到,如果我们在规则中没有设置并行数,也会创建只有1个元素的channel。这个channel会被用于调节并行执行的任务数量。所以这也就意味着,一旦调用了Limit方法而没设置Parallelism值,该Collector中针对符合规则的请求就会变成串行的。

第10和18行分别针对不同规则初始化一个编译器。因为这个操作比较重,所以在初始化时执行,之后只是简单使用这些编译器即可。

当发起请求时,流程最终会走到httpBackend的Do方法

func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {
	r := h.GetMatchingRule(request.URL.Host)
	if r != nil {
		r.waitChan <- true
		defer func(r *LimitRule) {
			randomDelay := time.Duration(0)
			if r.RandomDelay != 0 {
				randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))
			}
			time.Sleep(r.Delay + randomDelay)
			<-r.waitChan
		}(r)
	}

第2行通过域名查找对应的规则,如果找到,则在第4行尝试往channel中加入元素。这个操作相当于上锁。如果channel此时是满的,则该流程会被挂起。否则就执行之后的流程。在Do函数结束,命中规则的会执行上面的匿名函数,它在休眠规则配置的时间后,尝试从channel中获取数据。这个操作相当于释放锁。

Colly就是通过channel的特性实现了并行控制。

并行

在“规则”一节,我们讲到可以通过Parallelism控制并行goroutine的数量。httpBackend的Do方法最终将被Collector的fetch方法调用,而该方法可以被异步执行,即是一个goroutine。这就意味着承载Do逻辑的goroutine执行完毕后就会退出。而一种类似线程的技术在Colly也被支持,它更像一个生产者消费者模型。消费者线程执行完一个任务后不会退出,而在生产者生产出的物料池中取出未处理的任务加以处理。

以下代码截取于 Queue

q, _ := queue.New(
		2, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

	……

	for i := 0; i < 5; i++ {
		// Add URLs to the queue
		q.AddURL(fmt.Sprintf("%s?n=%d", url, i))
	}
	// Consume URLs
	q.Run(c)

这次没有调用Collector的Visit等函数,而是调用了Queue的Run。

第2行创建了一个具有2个消费者(goroutine)的Queue。第10行预先给这个Queue加入5个需要访问的URL。

// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {
	u, err := url.Parse(URL)
	if err != nil {
		return err
	}
	r := &colly.Request{
		URL:    u,
		Method: "GET",
	}
	d, err := r.Marshal()
	if err != nil {
		return err
	}
	return q.storage.AddRequest(d)
}

AddUrl的第11行将请求序列化,在第15行将该序列化数据保存到“仓库”中。

在Run方法中,Colly将启动2个goroutine。注意它是使用for循环组织的,这意味着如果for内无break,它会一直循环执行下去——不退出。

func (q *Queue) Run(c *colly.Collector) error {
	wg := &sync.WaitGroup{}
	for i := 0; i < q.Threads; i++ {
		wg.Add(1)
		go func(c *colly.Collector, wg *sync.WaitGroup) {
			defer wg.Done()
			for {

如果队列中没有需要处理的request,则会尝试退出

if q.IsEmpty() {
					if q.activeThreadCount == 0 {
						break
					}
					ch := make(chan bool)
					q.lock.Lock()
					q.threadChans = append(q.threadChans, ch)
					q.lock.Unlock()
					action := <-ch
					if action == stop && q.IsEmpty() {
						break
					}
				}

activeThreadCount表示当前运行中的消费者goroutine数量。如果已经没有消费者了,则直接跳出for循环,整个goroutine结束。

如果还有消费者,则创建一个channel,并将其加入到q.threadChans的channel切片中。然后在第9行等待该channel被写入值。如果写入的是true并且此时没有需要处理的request,则退出goroutine。可以看到这段逻辑检测了两次是否有request,这个我们之后再讨论。

如果还有request要处理,则递增消费者数量(在finish中会递减以抵消)。然后从“仓库”中取出一个任务,在通过Request的Do方法发起请求,最后调用finish方法善后。

q.lock.Lock()
				atomic.AddInt32(&q.activeThreadCount, 1)
				q.lock.Unlock()
				rb, err := q.storage.GetRequest()
				if err != nil || rb == nil {
					q.finish()
					continue
				}
				r, err := c.UnmarshalRequest(rb)
				if err != nil || r == nil {
					q.finish()
					continue
				}
				r.Do()
				q.finish()
			}
		}(c, wg)
	}
	wg.Wait()
	return nil
}

finish方法干了三件事:

  1. 递减消费者数量,以抵消Run方法中的递增。
  2. 将Queue的各个等待中的,其他goroutine创建的channel传入true值,即告知他们可以退出了。
  3. 给Queue创建一个空的channel切片
func (q *Queue) finish() {
	q.lock.Lock()
	q.activeThreadCount--
	for _, c := range q.threadChans {
		c <- stop
	}
	q.threadChans = make([]chan bool, 0, q.Threads)
	q.lock.Unlock()
}

我们再看下怎么在请求的过程中给Queue增加任务

// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {
	d, err := r.Marshal()
	if err != nil {
		return err
	}
	if err := q.storage.AddRequest(d); err != nil {
		return err
	}
	q.lock.Lock()
	for _, c := range q.threadChans {
		c <- !stop
	}
	q.threadChans = make([]chan bool, 0, q.Threads)
	q.lock.Unlock()
	return nil
}

第3~9行,会将请求序列化后保存到“仓库”中。

第10~15行,会将其他goroutine创建的channel传入false,告知它们不要退出。然后再创建一个空的channel切片。

finish和AddRequest都使用锁锁住了所有的逻辑,而且它们都会把其他goroutine创建的channel传入值,然后将Queue的channel切片清空。这样就保证这些channel只可能收到一种状态。由于它自己创建的channel是在finish调用完之后才有机会创建出来,所以不会造成死锁。

再回来看goroutine退出的逻辑

if q.IsEmpty() {
					if q.activeThreadCount == 0 {
						break
					}
					ch := make(chan bool)
					q.lock.Lock()
					q.threadChans = append(q.threadChans, ch)
					q.lock.Unlock()
					action := <-ch
					if action == stop && q.IsEmpty() {
						break
					}
				}

如果finish方法中递减的activeThreadCount为0,这说明这是最后一个goroutine了,而且当前也没request,所以退出。当然此时存在一种可能:在1行执行结束后,其他 非消费者goroutine 调用AddRequest新增了若干request。而执行第2行时,goroutine将退出,从而导致存在request没有处理的可能。

如果还存在其他goroutine,则本goroutine将在第5行创建一个channel,并将这个channel加入到Queue的channel切片中。供其他goroutine调用finish往channel中传入true,或者AddRequest传入false,调控是否需要退出本过程。在第9行等待channel传出数据前,可能存在如下几种情况:

  1. 执行了finish
  2. 执行了AddRequest
  3. 执行了finish后执行了AddRequest
  4. 执行了AddRequest后执行了finish

如果是第1和4种,action将是false。第2和3种,action是true。但是这个情况下不能单纯的通过action决定是否退出。因为第9和10行执行需要时间,这段时间其他goroutine可能还会执行AddRequest新增任务,或者GetRequest删除任务。所以还要在第10行检测下IsEmpty。

这段是我阅读Colly中思考的最多的代码,因为有goroutine和channel,导致整个逻辑比较复杂。也感慨下,虽然goroutine很方便,但是真的能把它写对也是不容易的。

分布式

在Queue例子中,我们看到“仓库”这个概念。回顾下 Queue 的例子,“仓库”是InMemoryQueueStorage。顾名思义,它是一个内存型的仓库,所以不存在分布式基础。

// create a request queue with 2 consumer threads
	q, _ := queue.New(
		2, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

一个分布式的例子是 Redis backend ,截取一段

// create the redis storage
	storage := &redisstorage.Storage{
		Address:  "127.0.0.1:6379",
		Password: "",
		DB:       0,
		Prefix:   "httpbin_test",
	}

	// add storage to the collector
	err := c.SetStorage(storage)
	if err != nil {
		panic(err)
	}

	// delete previous data from storage
	if err := storage.Clear(); err != nil {
		log.Fatal(err)
	}

	// close redis client
	defer storage.Client.Close()

	// create a new request queue with redis storage backend
	q, _ := queue.New(2, storage)

这儿创建了一个 redis 型的仓库。不仅Collector的Storage是它,Queue的Storage也是它。这样一个集群上的服务都往这个仓库里存入和取出数据,从而实现分布式架构。

redisstorage库引自 github.com/gocolly/redisstorage 。我们查看其源码,其实现了Collector的storage需要的接口

type Storage interface {
	// Init initializes the storage
	Init() error
	// Visited receives and stores a request ID that is visited by the Collector
	Visited(requestID uint64) error
	// IsVisited returns true if the request was visited before IsVisited
	// is called
	IsVisited(requestID uint64) (bool, error)
	// Cookies retrieves stored cookies for a given host
	Cookies(u *url.URL) string
	// SetCookies stores cookies for a given host
	SetCookies(u *url.URL, cookies string)
}

以及Queue的storage需要的

// Storage is the interface of the queue's storage backend
type Storage interface {
	// Init initializes the storage
	Init() error
	// AddRequest adds a serialized request to the queue
	AddRequest([]byte) error
	// GetRequest pops the next request from the queue
	// or returns error if the queue is empty
	GetRequest() ([]byte, error)
	// QueueSize returns with the size of the queue
	QueueSize() (int, error)
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据结构

数据结构

严蔚敏、吴伟民 / 清华大学出版社 / 2007-3-1 / 30.0

《数据结构》(C语言版)是为“数据结构”课程编写的教材,也可作为学习数据结构及其算法的C程序设计的参数教材。 本书的前半部分从抽象数据类型的角度讨论各种基本类型的数据结构及其应用;后半部分主要讨论查找和排序的各种实现方法及其综合分析比较。其内容和章节编排1992年4月出版的《数据结构》(第二版)基本一致,但在本书中更突出了抽象数据类型的概念。全书采用类C语言作为数据结构和算法的描述语言。 ......一起来看看 《数据结构》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具