内容简介:以下例子截取于c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测
递归深度
以下例子截取于 Basic
c := colly.NewCollector( // Visit only domains: hackerspaces.org, wiki.hackerspaces.org colly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"), ) // On every a element which has href attribute call callback c.OnHTML("a[href]", func(e *colly.HTMLElement) { link := e.Attr("href") // Print link fmt.Printf("Link found: %q -> %s\n", e.Text, link) // Visit link found on page // Only those links are visited which are in AllowedDomains c.Visit(e.Request.AbsoluteURL(link)) })
c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。
func (c *Collector) Visit(URL string) error { return c.scrape(URL, "GET", 1, nil, nil, nil, true) }
由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测
// requestCheck method if c.MaxDepth > 0 && c.MaxDepth < depth { return ErrMaxDepth }
如果希望通过MaxDepth控制深度,则可以参见 Max depth 例子
c := colly.NewCollector( // MaxDepth is 1, so only the links on the scraped page // is visited, and no further links are followed colly.MaxDepth(1), ) // On every a element which has href attribute call callback c.OnHTML("a[href]", func(e *colly.HTMLElement) { link := e.Attr("href") // Print link fmt.Println(link) // Visit link found on page e.Request.Visit(link) })
第4行将深度设置为1,这样理论上只能访问第一层的URL。
如果OnHTML中的代码和Basic例子一样,即使用Collector的Visit访问URL,则由于其depth一直传1,而导致requestCheck的深度检测一直不满足条件,从而会访问超过1层的URL。
所以第13行,调用的是HTMLElement的Visit方法
func (r *Request) Visit(URL string) error { return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true) }
相较于Collector的Visit,HTMLElement的Visit方法将Depth增加了1,并且传递了请求的上下文(ctx)。由于depth有变化,所以之后的深度检测会返回错误,从而只会访问1层URL。
规则
Collector的Limit方法用于设置各种规则。这些规则最终在Collector的httpBackend成员中执行。
一个Collector只有一个httpBackend结构体指针,而一个httpBackend结构体可以有一组规则
type httpBackend struct { LimitRules []*LimitRule Client *http.Client lock *sync.RWMutex }
规则针对Domain来区分,我们可以通过设定不同的匹配规则,让每组URL执行相应的操作。这些操作包括:
- 访问并行数
- 访问间隔延迟
参见 Parallel 例子。只截取其中关键一段
// Limit the maximum parallelism to 2 // This is necessary if the goroutines are dynamically // created to control the limit of simultaneous requests. // // Parallelism can be controlled also by spawning fixed // number of go routines. c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})
Collector的Limit最终会调用到httpBackend的Limit,它将规则加入到规则组后初始化该规则。
// Init initializes the private members of LimitRule func (r *LimitRule) Init() error { waitChanSize := 1 if r.Parallelism > 1 { waitChanSize = r.Parallelism } r.waitChan = make(chan bool, waitChanSize) hasPattern := false if r.DomainRegexp != "" { c, err := regexp.Compile(r.DomainRegexp) if err != nil { return err } r.compiledRegexp = c hasPattern = true } if r.DomainGlob != "" { c, err := glob.Compile(r.DomainGlob) if err != nil { return err } r.compiledGlob = c hasPattern = true } if !hasPattern { return ErrNoPattern } return nil }
第7行创建了一个可以承载waitChanSize个元素的channel。可以看到,如果我们在规则中没有设置并行数,也会创建只有1个元素的channel。这个channel会被用于调节并行执行的任务数量。所以这也就意味着,一旦调用了Limit方法而没设置Parallelism值,该Collector中针对符合规则的请求就会变成串行的。
第10和18行分别针对不同规则初始化一个编译器。因为这个操作比较重,所以在初始化时执行,之后只是简单使用这些编译器即可。
当发起请求时,流程最终会走到httpBackend的Do方法
func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) { r := h.GetMatchingRule(request.URL.Host) if r != nil { r.waitChan <- true defer func(r *LimitRule) { randomDelay := time.Duration(0) if r.RandomDelay != 0 { randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay))) } time.Sleep(r.Delay + randomDelay) <-r.waitChan }(r) }
第2行通过域名查找对应的规则,如果找到,则在第4行尝试往channel中加入元素。这个操作相当于上锁。如果channel此时是满的,则该流程会被挂起。否则就执行之后的流程。在Do函数结束,命中规则的会执行上面的匿名函数,它在休眠规则配置的时间后,尝试从channel中获取数据。这个操作相当于释放锁。
Colly就是通过channel的特性实现了并行控制。
并行
在“规则”一节,我们讲到可以通过Parallelism控制并行goroutine的数量。httpBackend的Do方法最终将被Collector的fetch方法调用,而该方法可以被异步执行,即是一个goroutine。这就意味着承载Do逻辑的goroutine执行完毕后就会退出。而一种类似线程的技术在Colly也被支持,它更像一个生产者消费者模型。消费者线程执行完一个任务后不会退出,而在生产者生产出的物料池中取出未处理的任务加以处理。
以下代码截取于 Queue
q, _ := queue.New( 2, // Number of consumer threads &queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage ) …… for i := 0; i < 5; i++ { // Add URLs to the queue q.AddURL(fmt.Sprintf("%s?n=%d", url, i)) } // Consume URLs q.Run(c)
这次没有调用Collector的Visit等函数,而是调用了Queue的Run。
第2行创建了一个具有2个消费者(goroutine)的Queue。第10行预先给这个Queue加入5个需要访问的URL。
// AddURL adds a new URL to the queue func (q *Queue) AddURL(URL string) error { u, err := url.Parse(URL) if err != nil { return err } r := &colly.Request{ URL: u, Method: "GET", } d, err := r.Marshal() if err != nil { return err } return q.storage.AddRequest(d) }
AddUrl的第11行将请求序列化,在第15行将该序列化数据保存到“仓库”中。
在Run方法中,Colly将启动2个goroutine。注意它是使用for循环组织的,这意味着如果for内无break,它会一直循环执行下去——不退出。
func (q *Queue) Run(c *colly.Collector) error { wg := &sync.WaitGroup{} for i := 0; i < q.Threads; i++ { wg.Add(1) go func(c *colly.Collector, wg *sync.WaitGroup) { defer wg.Done() for {
如果队列中没有需要处理的request,则会尝试退出
if q.IsEmpty() { if q.activeThreadCount == 0 { break } ch := make(chan bool) q.lock.Lock() q.threadChans = append(q.threadChans, ch) q.lock.Unlock() action := <-ch if action == stop && q.IsEmpty() { break } }
activeThreadCount表示当前运行中的消费者goroutine数量。如果已经没有消费者了,则直接跳出for循环,整个goroutine结束。
如果还有消费者,则创建一个channel,并将其加入到q.threadChans的channel切片中。然后在第9行等待该channel被写入值。如果写入的是true并且此时没有需要处理的request,则退出goroutine。可以看到这段逻辑检测了两次是否有request,这个我们之后再讨论。
如果还有request要处理,则递增消费者数量(在finish中会递减以抵消)。然后从“仓库”中取出一个任务,在通过Request的Do方法发起请求,最后调用finish方法善后。
q.lock.Lock() atomic.AddInt32(&q.activeThreadCount, 1) q.lock.Unlock() rb, err := q.storage.GetRequest() if err != nil || rb == nil { q.finish() continue } r, err := c.UnmarshalRequest(rb) if err != nil || r == nil { q.finish() continue } r.Do() q.finish() } }(c, wg) } wg.Wait() return nil }
finish方法干了三件事:
- 递减消费者数量,以抵消Run方法中的递增。
- 将Queue的各个等待中的,其他goroutine创建的channel传入true值,即告知他们可以退出了。
- 给Queue创建一个空的channel切片
func (q *Queue) finish() { q.lock.Lock() q.activeThreadCount-- for _, c := range q.threadChans { c <- stop } q.threadChans = make([]chan bool, 0, q.Threads) q.lock.Unlock() }
我们再看下怎么在请求的过程中给Queue增加任务
// AddRequest adds a new Request to the queue func (q *Queue) AddRequest(r *colly.Request) error { d, err := r.Marshal() if err != nil { return err } if err := q.storage.AddRequest(d); err != nil { return err } q.lock.Lock() for _, c := range q.threadChans { c <- !stop } q.threadChans = make([]chan bool, 0, q.Threads) q.lock.Unlock() return nil }
第3~9行,会将请求序列化后保存到“仓库”中。
第10~15行,会将其他goroutine创建的channel传入false,告知它们不要退出。然后再创建一个空的channel切片。
finish和AddRequest都使用锁锁住了所有的逻辑,而且它们都会把其他goroutine创建的channel传入值,然后将Queue的channel切片清空。这样就保证这些channel只可能收到一种状态。由于它自己创建的channel是在finish调用完之后才有机会创建出来,所以不会造成死锁。
再回来看goroutine退出的逻辑
if q.IsEmpty() { if q.activeThreadCount == 0 { break } ch := make(chan bool) q.lock.Lock() q.threadChans = append(q.threadChans, ch) q.lock.Unlock() action := <-ch if action == stop && q.IsEmpty() { break } }
如果finish方法中递减的activeThreadCount为0,这说明这是最后一个goroutine了,而且当前也没request,所以退出。当然此时存在一种可能:在1行执行结束后,其他 非消费者goroutine 调用AddRequest新增了若干request。而执行第2行时,goroutine将退出,从而导致存在request没有处理的可能。
如果还存在其他goroutine,则本goroutine将在第5行创建一个channel,并将这个channel加入到Queue的channel切片中。供其他goroutine调用finish往channel中传入true,或者AddRequest传入false,调控是否需要退出本过程。在第9行等待channel传出数据前,可能存在如下几种情况:
- 执行了finish
- 执行了AddRequest
- 执行了finish后执行了AddRequest
- 执行了AddRequest后执行了finish
如果是第1和4种,action将是false。第2和3种,action是true。但是这个情况下不能单纯的通过action决定是否退出。因为第9和10行执行需要时间,这段时间其他goroutine可能还会执行AddRequest新增任务,或者GetRequest删除任务。所以还要在第10行检测下IsEmpty。
这段是我阅读Colly中思考的最多的代码,因为有goroutine和channel,导致整个逻辑比较复杂。也感慨下,虽然goroutine很方便,但是真的能把它写对也是不容易的。
分布式
在Queue例子中,我们看到“仓库”这个概念。回顾下 Queue 的例子,“仓库”是InMemoryQueueStorage。顾名思义,它是一个内存型的仓库,所以不存在分布式基础。
// create a request queue with 2 consumer threads q, _ := queue.New( 2, // Number of consumer threads &queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage )
一个分布式的例子是 Redis backend ,截取一段
// create the redis storage storage := &redisstorage.Storage{ Address: "127.0.0.1:6379", Password: "", DB: 0, Prefix: "httpbin_test", } // add storage to the collector err := c.SetStorage(storage) if err != nil { panic(err) } // delete previous data from storage if err := storage.Clear(); err != nil { log.Fatal(err) } // close redis client defer storage.Client.Close() // create a new request queue with redis storage backend q, _ := queue.New(2, storage)
这儿创建了一个 redis 型的仓库。不仅Collector的Storage是它,Queue的Storage也是它。这样一个集群上的服务都往这个仓库里存入和取出数据,从而实现分布式架构。
redisstorage库引自 github.com/gocolly/redisstorage 。我们查看其源码,其实现了Collector的storage需要的接口
type Storage interface { // Init initializes the storage Init() error // Visited receives and stores a request ID that is visited by the Collector Visited(requestID uint64) error // IsVisited returns true if the request was visited before IsVisited // is called IsVisited(requestID uint64) (bool, error) // Cookies retrieves stored cookies for a given host Cookies(u *url.URL) string // SetCookies stores cookies for a given host SetCookies(u *url.URL, cookies string) }
以及Queue的storage需要的
// Storage is the interface of the queue's storage backend type Storage interface { // Init initializes the storage Init() error // AddRequest adds a serialized request to the queue AddRequest([]byte) error // GetRequest pops the next request from the queue // or returns error if the queue is empty GetRequest() ([]byte, error) // QueueSize returns with the size of the queue QueueSize() (int, error) }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 深度解读 ReentrantLock 底层源码
- 浅谈 Java 集合 | 底层源码解析
- 笔记-runtime源码解析之让你彻底了解底层源码
- 直面底层:“吹上天”的协程,带你深入源码分析
- 「Go」- golang源码分析 - channel的底层实现
- Java8线程池ThreadPoolExecutor底层原理及其源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。