内容简介:Collector是Colly的核心结构体,其中包含了用户对框架行为的定义。一般情况下,我们可以使用NewCollector方法构建一个它的指针第4行调用了Init方法初始化了Collector的一些成员。然后遍历并调用不定长参数,这些参数都是函数类型——func(*Collector)。我们看个例子AllowedDomains和CacheDir都返回一个匿名函数,其实现就是将Collector对象中对应的成员设置为指定的值
Collector是Colly的核心结构体,其中包含了用户对框架行为的定义。一般情况下,我们可以使用NewCollector方法构建一个它的指针
// NewCollector creates a new Collector instance with default configuration func NewCollector(options ...func(*Collector)) *Collector { c := &Collector{} c.Init() for _, f := range options { f(c) } c.parseSettingsFromEnv() return c }
第4行调用了Init方法初始化了Collector的一些成员。然后遍历并调用不定长参数,这些参数都是函数类型——func(*Collector)。我们看个例子
c := colly.NewCollector( // Visit only domains: coursera.org, www.coursera.org colly.AllowedDomains("coursera.org", "www.coursera.org"), // Cache responses to prevent multiple download of pages // even if the collector is restarted colly.CacheDir("./coursera_cache"), )
AllowedDomains和CacheDir都返回一个匿名函数,其实现就是将Collector对象中对应的成员设置为指定的值
// AllowedDomains sets the domain whitelist used by the Collector. func AllowedDomains(domains ...string) func(*Collector) { return func(c *Collector) { c.AllowedDomains = domains } }
Collector中绝大部分成员均有对应的方法,而且它们的名称(函数名和成员名)也一致。但是其中只有3个方法——ParseHTTPErrorResponse、AllowURLRevisit和IgnoreRobotsTxt比较特殊,因为它们没有参数。如果被调用,则对应的Collector成员会被设置为true
// AllowURLRevisit instructs the Collector to allow multiple downloads of the same URL func AllowURLRevisit() func(*Collector) { return func(c *Collector) { c.AllowURLRevisit = true } }
再回到NewCollector函数,其最后一个逻辑是调用parseSettingsFromEnv方法。从名称我们可以看出它是用于解析环境变量的。将它放在最后是可以理解的,因为后面执行的逻辑可以覆盖前面的逻辑。这样我们可以让环境变量对应的设置生效。
func (c *Collector) parseSettingsFromEnv() { for _, e := range os.Environ() { if !strings.HasPrefix(e, "COLLY_") { continue } pair := strings.SplitN(e[6:], "=", 2) if f, ok := envMap[pair[0]]; ok { f(c, pair[1]) } else { log.Println("Unknown environment variable:", pair[0]) } } }
它从os.Environ()中获取系统环境变量,然后遍历它们。对于以COLLY_开头的变量,找到其在envMap中的对应方法,并调用之以覆盖之前设置的Collector成员变量值。envMap是一个<string,func>的映射,它是包内全局的。
var envMap = map[string]func(*Collector, string){ "ALLOWED_DOMAINS": func(c *Collector, val string) { c.AllowedDomains = strings.Split(val, ",") }, "CACHE_DIR": func(c *Collector, val string) { c.CacheDir = val }, ……
初始化完Collector,我们就可以让其发送请求。目前Colly公开了5个方法,其中3个是和Post相关的:Post、PostRaw和PostMultipart。一个Get请求方法:Visit。以及一个用户可以高度定制的方法:Request。这些方法底层都调用了scrape方法。比如Visit的实现是
func (c *Collector) Visit(URL string) error { return c.scrape(URL, "GET", 1, nil, nil, nil, true) }
scrape方法是需要我们展开分析的。因为它是Colly库中两个最重要的方法之一。
// scrape method func (c *Collector) scrape(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, checkRevisit bool) error { if err := c.requestCheck(u, method, depth, checkRevisit); err != nil { return err }
首先requestCheck方法检测一些和递归深度以及URL相关的信息
func (c *Collector) requestCheck(u, method string, depth int, checkRevisit bool) error { if u == "" { return ErrMissingURL } if c.MaxDepth > 0 && c.MaxDepth < depth { return ErrMaxDepth }
Collector的MaxDepth默认设置为0,即不用比较深度。如果它被设置值,则递归深度不可以超过它。
然后检测URL是否在被禁止的URL过滤器中。如果在,则返回错误。
if len(c.DisallowedURLFilters) > 0 { if isMatchingFilter(c.DisallowedURLFilters, []byte(u)) { return ErrForbiddenURL } }
之后检测URL是否在准入的URL过滤器中。如果不在,则返回错误
if len(c.URLFilters) > 0 { if !isMatchingFilter(c.URLFilters, []byte(u)) { return ErrNoURLFiltersMatch } }
最后针对GET请求,检查其是否被请求过。
if checkRevisit && !c.AllowURLRevisit && method == "GET" { h := fnv.New64a() h.Write([]byte(u)) uHash := h.Sum64() visited, err := c.store.IsVisited(uHash) if err != nil { return err } if visited { return ErrAlreadyVisited } return c.store.Visited(uHash) } return nil }
通过这些检测后,scrape会对URL组成进行分析补齐
// scrape method parsedURL, err := url.Parse(u) if err != nil { return err } if parsedURL.Scheme == "" { parsedURL.Scheme = "http" }
然后针对host进行精确匹配(在requestCheck中,是对URL使用正则进行匹配)。先检测host是否在被禁止的列表中,然后检测其是否在准入的列表中。
// scrape method if !c.isDomainAllowed(parsedURL.Host) { return ErrForbiddenDomain }
func (c *Collector) isDomainAllowed(domain string) bool { for _, d2 := range c.DisallowedDomains { if d2 == domain { return false } } if c.AllowedDomains == nil || len(c.AllowedDomains) == 0 { return true } for _, d2 := range c.AllowedDomains { if d2 == domain { return true } } return false }
通过上面检测,还需要检查是否需要遵从Robots协议
// scrape method if !c.IgnoreRobotsTxt { if err = c.checkRobots(parsedURL); err != nil { return err } }
所有检测通过后,就需要填充请求了
// scrape method if hdr == nil { hdr = http.Header{"User-Agent": []string{c.UserAgent}} } rc, ok := requestData.(io.ReadCloser) if !ok && requestData != nil { rc = ioutil.NopCloser(requestData) } req := &http.Request{ Method: method, URL: parsedURL, Proto: "HTTP/1.1", ProtoMajor: 1, ProtoMinor: 1, Header: hdr, Body: rc, Host: parsedURL.Host, } setRequestBody(req, requestData)
第5~8行,使用类型断言等方法,将请求的数据(requestData)转换成io.ReadCloser接口数据。setRequestBody方法则是根据数据(requestData)的原始类型,设置Request结构中的GetBody方法
func setRequestBody(req *http.Request, body io.Reader) { if body != nil { switch v := body.(type) { case *bytes.Buffer: req.ContentLength = int64(v.Len()) buf := v.Bytes() req.GetBody = func() (io.ReadCloser, error) { r := bytes.NewReader(buf) return ioutil.NopCloser(r), nil } case *bytes.Reader: req.ContentLength = int64(v.Len()) snapshot := *v req.GetBody = func() (io.ReadCloser, error) { r := snapshot return ioutil.NopCloser(&r), nil } case *strings.Reader: req.ContentLength = int64(v.Len()) snapshot := *v req.GetBody = func() (io.ReadCloser, error) { r := snapshot return ioutil.NopCloser(&r), nil } } if req.GetBody != nil && req.ContentLength == 0 { req.Body = http.NoBody req.GetBody = func() (io.ReadCloser, error) { return http.NoBody, nil } } } }
这种抽象方式,使得 不同类型的requestData都可以通过统一的GetBody方法获取内容 。目前Colly中发送数据有3种复合结构,分别是:map[string]string、requestData []byte和map[string][]byte。对于普通的Post传送map[string]string数据,Colly会使用createFormReader方法将其转换成Reader结构指针
func createFormReader(data map[string]string) io.Reader { form := url.Values{} for k, v := range data { form.Add(k, v) } return strings.NewReader(form.Encode()) }
如果是一个二进制切片,则使用bytes.NewReader直接将其转换为Reader结构指针
如果是map[string][]byte,则是Post数据的Multipart结构,使用createMultipartReader方法将其转换成Buffer结构指针。
func createMultipartReader(boundary string, data map[string][]byte) io.Reader { dashBoundary := "--" + boundary body := []byte{} buffer := bytes.NewBuffer(body) buffer.WriteString("Content-type: multipart/form-data; boundary=" + boundary + "\n\n") for contentType, content := range data { buffer.WriteString(dashBoundary + "\n") buffer.WriteString("Content-Disposition: form-data; name=" + contentType + "\n") buffer.WriteString(fmt.Sprintf("Content-Length: %d \n\n", len(content))) buffer.Write(content) buffer.WriteString("\n") } buffer.WriteString(dashBoundary + "--\n\n") return buffer }
回到scrape方法中,数据准备结束,开始正式获取数据
// scrape method u = parsedURL.String() c.wg.Add(1) if c.Async { go c.fetch(u, method, depth, requestData, ctx, hdr, req) return nil } return c.fetch(u, method, depth, requestData, ctx, hdr, req) }
通过第4行我们可以看到,可以通过Async参数决定是否异步的获取数据。
在解析fetch方法前,我们要先介绍Collector的几个回调函数
htmlCallbacks []*htmlCallbackContainer xmlCallbacks []*xmlCallbackContainer requestCallbacks []RequestCallback responseCallbacks []ResponseCallback errorCallbacks []ErrorCallback scrapedCallbacks []ScrapedCallback
以requestCallbacks为例,Colly提供了OnRequest方法用于注册回调。由于这些回调函数通过切片保存,所以可以多次调用注册方法。(即不是覆盖之前的注册回调)
// OnRequest registers a function. Function will be executed on every // request made by the Collector func (c *Collector) OnRequest(f RequestCallback) { c.lock.Lock() if c.requestCallbacks == nil { c.requestCallbacks = make([]RequestCallback, 0, 4) } c.requestCallbacks = append(c.requestCallbacks, f) c.lock.Unlock() }
用户则可以使用下面方法进行注册
// Before making a request print "Visiting ..." c.OnRequest(func(r *colly.Request) { fmt.Println("Visiting", r.URL.String()) })
这些回调会被在handleOnXXXX类型的函数中被调用。调用的顺序和注册的顺序一致。
func (c *Collector) handleOnResponse(r *Response) { if c.debugger != nil { c.debugger.Event(createEvent("response", r.Request.ID, c.ID, map[string]string{ "url": r.Request.URL.String(), "status": http.StatusText(r.StatusCode), })) } for _, f := range c.responseCallbacks { f(r) } }
每次调用fetch方法都会构建一个全新Request结构。
// fetch method func (c *Collector) fetch(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, req *http.Request) error { defer c.wg.Done() if ctx == nil { ctx = NewContext() } request := &Request{ URL: req.URL, Headers: &req.Header, Ctx: ctx, Depth: depth, Method: method, Body: requestData, collector: c, ID: atomic.AddUint32(&c.requestCount, 1), }
这儿注意一下3~5行ctx(上下文)的构建逻辑。如果传入的ctx为nil,则构建一个新的,否则使用老的。这就意味着 Request结构体(以及之后出现的Response结构体)中的ctx可以是每次调用fetch时全新产生的,也可以是各个Request公用的。 我们回溯下ctx的调用栈,发现只有func (c *Collector) Request(……)方法使用的不是nil
func (c *Collector) Request(method, URL string, requestData io.Reader, ctx *Context, hdr http.Header) error { return c.scrape(URL, method, 1, requestData, ctx, hdr, true) }
这也就意味着,调用Visit、Post、PostRaw和PostMultipart方法在每次调用fetch时都会产生一个新的上下文。
由于Context存在被多个goroutine共享访问的可能性,所以其定义了读写锁进行保护
type Context struct { contextMap map[string]interface{} lock *sync.RWMutex }
再回到fetch方法。数据填充完毕后,就提供了一次给用户干预之后流程的机会
// fetch method c.handleOnRequest(request) if request.abort { return nil }
之前我们讲解过,handleOnRequest调用的是用户通过OnRequest注册个所有回调函数。如果用户在该回调中调用了下面方法,则之后的流程都不走了。
// Abort cancels the HTTP request when called in an OnRequest callback func (r *Request) Abort() { r.abort = true }
如果用户没用终止执行,则开始发送请求
// fetch method if method == "POST" && req.Header.Get("Content-Type") == "" { req.Header.Add("Content-Type", "application/x-www-form-urlencoded") } if req.Header.Get("Accept") == "" { req.Header.Set("Accept", "*/*") } origURL := req.URL response, err := c.backend.Cache(req, c.MaxBodySize, c.CacheDir)
对于这次请求,不管是否出错都会触发用户定义的Error回调
// fetch method if err := c.handleOnError(response, err, request, ctx); err != nil { return err }
在handleOnError函数中,回调函数会接收到err原因,所以用户自定义的错误处理函数需要通过该值来做区分。
for _, f := range c.errorCallbacks { f(response, err) } return err
正常请求后,fetch会使用ctx和修复后的request填充到response中
// fetch method if req.URL != origURL { request.URL = req.URL request.Headers = &req.Header } if proxyURL, ok := req.Context().Value(ProxyURLKey).(string); ok { request.ProxyURL = proxyURL } atomic.AddUint32(&c.responseCount, 1) response.Ctx = ctx response.Request = request err = response.fixCharset(c.DetectCharset, request.ResponseCharacterEncoding) if err != nil { return err }
最后在一系列调用用户回调中结束fetch
// fetch method c.handleOnResponse(response) err = c.handleOnHTML(response) if err != nil { c.handleOnError(response, err, request, ctx) } err = c.handleOnXML(response) if err != nil { c.handleOnError(response, err, request, ctx) } c.handleOnScraped(response) return err }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。