GoLang - 并发版爬虫

栏目: 编程工具 · 发布时间: 6年前

内容简介:将爬虫分为两部分:一、队列调度器:提供下载请求给二、

原文链接

并发版爬虫架构

GoLang - 并发版爬虫

go_spider.png

将爬虫分为两部分:

一、队列调度器:提供下载请求给 Process

二、 Process :包括下载请求、解析下载的内容、返回新请求列表给队列调度器、输出下载内容。

具体实现:

Process
Process

爬虫引擎

package spider

import (
    "downloader"
    "github.com/PuerkitoBio/goquery"
    "log"
    "pageprocess"
    "pipeline"
    "scheduler"
    "strconv"
    "time"
)

// threadnum   - 线程数量
// scheduler   - 调度器
// downloader  - 下载器
// pageprocess - 页面处理  
// pipeline    - 输出      
type Spider struct {
    threadnum uint8
    scheduler scheduler.Scheduler
    downloader downloader.DownLoader
    pageprocess pageprocess.PageProcess
    pipeline pipeline.PipeLine
}
// NewSpider 创建一个爬虫引擎
func NewSpider(threadnum int,path string) *Spider{
    return &Spider{
        scheduler:scheduler.NewQueueSCheduler(),
        downloader:downloader.NewHttpDownLoader(),
        pageprocess:pageprocess.NewPageProcess(),
        pipeline:pipeline.NewFilePipeLine(path),
        threadnum:uint8(threadnum),
    }
}
// Run 引擎运行
func (s *Spider) Run(){
  // Process并发数量
    rm := NewResourceManagerChan(s.threadnum)
    log.Println("[Spider] 爬虫运行 - 处理线程数:" + strconv.Itoa(rm.Cap()))
    for{
        url,ok := s.scheduler.Pop()
        // 爬取队列为空 并且 没有Process线程在处理 认为爬虫结束
        if ok == false && rm.Has() == 0{
            log.Println("[Spider] 爬虫运行结束")
            break
        }else if ok == false{ // Process线程正在处理,可能还会有新的请求加入调度
            log.Println("[Spider] 爬取队列为空 - 等待处理")
            time.Sleep(500 * time.Millisecond)
            continue
        }
        // 控制Process线程并发数量
        rm.GetOne()
        go func(url string) {
            defer rm.FreeOne()
            s.Process(url)
        }(url)
    }
}
// 添加请求链接
func (s *Spider) AddUrl(url string) *Spider{
    s.scheduler.Push(url)
    return s
}
func (s *Spider) AddUrls(urls []string) *Spider{
    for _,url := range urls{
        s.scheduler.Push(url)
    }
    return s
}
// 处理请求链接
func (s *Spider) Process(url string){
  // 下载链接
    resp := s.downloader.DownLoad(url)
    if resp == nil{
        /*下载失败重新加入调度队列中*/
        if !s.downloader.Visited(url){
            s.scheduler.Push(url)
        }
        return
    }
  // 页面处理 - 使用goquery包简单处理
    doc,err := goquery.NewDocumentFromReader(resp.Body)
    if err != nil{
        log.Println("[Process] 解析错误")
        s.scheduler.Push(url)
        return
    }
    // 将新请求链接加入到调度器中
    links := s.pageprocess.Process(doc)
    for _,url := range links{
        if !s.downloader.Visited(url){
            s.scheduler.Push(url)
        }
    }
    // 输出文档
    go s.pipeline.Process(doc)
}
// 控制线程并发数
package spider

type ResourceManager struct {
    tc chan uint8
}

func NewResourceManagerChan(num uint8) *ResourceManager{
    tc := make(chan uint8,num)
    return &ResourceManager{tc:tc}
}

func (r *ResourceManager) GetOne(){
    r.tc <- 1
}

func (r *ResourceManager) FreeOne(){
    <- r.tc
}

func (r *ResourceManager) Cap() int{
    return cap(r.tc)
}

func (r *ResourceManager) Has() int{
    return len(r.tc)
}

func (r *ResourceManager) Left() int{
    return cap(r.tc) - len(r.tc)
}

队列调度器

队列调度器实现获取以及储存请求。

请求的重复性交给下载器来判断(考虑只有下载成功的请求才不需要访问)。

简化的请求为 string 类型的 url 链接。

package scheduler

import (
    "container/list"
    "crypto/md5"
    "sync"
)

type QueueScheduler struct {
    queue    *list.List
    locker   *sync.Mutex
    listkey  map[[md5.Size]byte] *list.Element
}

func NewQueueSCheduler() *QueueScheduler{
    queue   := list.New()
    locker  := new(sync.Mutex)
    listkey := make(map[[md5.Size]byte] *list.Element)

    return &QueueScheduler{
        queue:queue,
        locker:locker,
        listkey:listkey}
}

// Pop - 从队列中获取一个链接
func (s *QueueScheduler) Pop() (string,bool){
    s.locker.Lock()
    if s.queue.Len() <= 0{
        s.locker.Unlock()
        return "",false
    }
    e := s.queue.Front()
    ret := e.Value.(string)
    // 清除listkey中该元素,加入到访问队列中
    key := md5.Sum([]byte(ret))
    delete(s.listkey,key)
    s.queue.Remove(e)
    s.locker.Unlock()
    return ret,true
}

// Push - 将链接放入队列中
func (s *QueueScheduler) Push(url string){
    s.locker.Lock()
    key := md5.Sum([]byte(url))
    // 链接已存在
    if _,ok := s.listkey[key]; ok{
        s.locker.Unlock()
        return
    }
    e := s.queue.PushBack(url)
    s.listkey[key] = e
    s.locker.Unlock()
}

下载器

下载器提供接口下载请求,并返回下载得到的内容。

下载器提供接口判断请求是否已经被处理过。

若下载失败则标记当前请求访问失败,反之标记当前请求访问成功,使用 map 储存。

简化的下载器仅使用的 http 包中的 Get 方法。

package downloader

import (
   "crypto/md5"
   "log"
   "net/http"
   "sync"
)

type HttpDownLoader struct {
   locker *sync.Mutex
   downloaded map[[md5.Size]byte] bool
}

func NewHttpDownLoader() *HttpDownLoader{
   locker := new(sync.Mutex)
   downloaded := make(map[[md5.Size]byte]bool)
   return &HttpDownLoader{
      locker:locker,
      downloaded:downloaded,
   }
}

// 下载链接
func (h *HttpDownLoader) DownLoad(url string) *http.Response{
   key := md5.Sum([]byte(url))
   resp,err := http.Get(url)
   h.locker.Lock()
   // 已经被访问过了,不需要访问。
   if ok,has := h.downloaded[key]; has && ok{
      h.locker.Unlock()
      return nil
   }
   // 访问失败
   if err != nil || resp.StatusCode != http.StatusOK{
      log.Println("[DownLoader] 下载链接失败:" + url)
      h.downloaded[key] = false
      h.locker.Unlock()
      return nil
   }
   h.downloaded[key] = true
   h.locker.Unlock()
   log.Println("[DownLoader] 下载链接成功:" + url)
   return resp
}

// 链接是否被访问
func (h *HttpDownLoader) Visited(url string) bool{
   key := md5.Sum([]byte(url))
   var ret bool
   h.locker.Lock()
   if ok,has := h.downloaded[key]; has && ok{
      ret = true
   }else{
      ret = false
   }
   h.locker.Unlock()
   return ret
}

页面处理

页面处理需要返回链接请求集合,这里简化为 []string 类型。

页面处理需要返回文档,这里直接简化为 goquery 包中的 document

package pageprocess

import (
    "github.com/PuerkitoBio/goquery"
)

type PageProcess struct {
}

func NewPageProcess() PageProcess{
    return PageProcess{}
}

// 返回链接函数
func (p *PageProcess) Process(d *goquery.Document) []string{
    var links []string
  // 获取链接的处理代码
    return links
}

输出

package pipeline

import (
    "github.com/PuerkitoBio/goquery"
    "log"
    "os"
)

type FilePipeLine struct {
    dir string
}

func NewFilePipeLine(dir string) *FilePipeLine{
    return &FilePipeLine{dir:dir}
}

func (p *FilePipeLine) Process(doc *goquery.Document){
    // 文件写入实现
}

参考


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Essential PHP Security

Essential PHP Security

Chris Shiflett / O'Reilly Media / 2005-10-13 / USD 29.95

Being highly flexible in building dynamic, database-driven web applications makes the PHP programming language one of the most popular web development tools in use today. It also works beautifully wit......一起来看看 《Essential PHP Security》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具