GoLang - 并发版爬虫

栏目: 编程工具 · 发布时间: 5年前

内容简介:将爬虫分为两部分:一、队列调度器:提供下载请求给二、

原文链接

并发版爬虫架构

GoLang - 并发版爬虫

go_spider.png

将爬虫分为两部分:

一、队列调度器:提供下载请求给 Process

二、 Process :包括下载请求、解析下载的内容、返回新请求列表给队列调度器、输出下载内容。

具体实现:

Process
Process

爬虫引擎

package spider

import (
    "downloader"
    "github.com/PuerkitoBio/goquery"
    "log"
    "pageprocess"
    "pipeline"
    "scheduler"
    "strconv"
    "time"
)

// threadnum   - 线程数量
// scheduler   - 调度器
// downloader  - 下载器
// pageprocess - 页面处理  
// pipeline    - 输出      
type Spider struct {
    threadnum uint8
    scheduler scheduler.Scheduler
    downloader downloader.DownLoader
    pageprocess pageprocess.PageProcess
    pipeline pipeline.PipeLine
}
// NewSpider 创建一个爬虫引擎
func NewSpider(threadnum int,path string) *Spider{
    return &Spider{
        scheduler:scheduler.NewQueueSCheduler(),
        downloader:downloader.NewHttpDownLoader(),
        pageprocess:pageprocess.NewPageProcess(),
        pipeline:pipeline.NewFilePipeLine(path),
        threadnum:uint8(threadnum),
    }
}
// Run 引擎运行
func (s *Spider) Run(){
  // Process并发数量
    rm := NewResourceManagerChan(s.threadnum)
    log.Println("[Spider] 爬虫运行 - 处理线程数:" + strconv.Itoa(rm.Cap()))
    for{
        url,ok := s.scheduler.Pop()
        // 爬取队列为空 并且 没有Process线程在处理 认为爬虫结束
        if ok == false && rm.Has() == 0{
            log.Println("[Spider] 爬虫运行结束")
            break
        }else if ok == false{ // Process线程正在处理,可能还会有新的请求加入调度
            log.Println("[Spider] 爬取队列为空 - 等待处理")
            time.Sleep(500 * time.Millisecond)
            continue
        }
        // 控制Process线程并发数量
        rm.GetOne()
        go func(url string) {
            defer rm.FreeOne()
            s.Process(url)
        }(url)
    }
}
// 添加请求链接
func (s *Spider) AddUrl(url string) *Spider{
    s.scheduler.Push(url)
    return s
}
func (s *Spider) AddUrls(urls []string) *Spider{
    for _,url := range urls{
        s.scheduler.Push(url)
    }
    return s
}
// 处理请求链接
func (s *Spider) Process(url string){
  // 下载链接
    resp := s.downloader.DownLoad(url)
    if resp == nil{
        /*下载失败重新加入调度队列中*/
        if !s.downloader.Visited(url){
            s.scheduler.Push(url)
        }
        return
    }
  // 页面处理 - 使用goquery包简单处理
    doc,err := goquery.NewDocumentFromReader(resp.Body)
    if err != nil{
        log.Println("[Process] 解析错误")
        s.scheduler.Push(url)
        return
    }
    // 将新请求链接加入到调度器中
    links := s.pageprocess.Process(doc)
    for _,url := range links{
        if !s.downloader.Visited(url){
            s.scheduler.Push(url)
        }
    }
    // 输出文档
    go s.pipeline.Process(doc)
}
// 控制线程并发数
package spider

type ResourceManager struct {
    tc chan uint8
}

func NewResourceManagerChan(num uint8) *ResourceManager{
    tc := make(chan uint8,num)
    return &ResourceManager{tc:tc}
}

func (r *ResourceManager) GetOne(){
    r.tc <- 1
}

func (r *ResourceManager) FreeOne(){
    <- r.tc
}

func (r *ResourceManager) Cap() int{
    return cap(r.tc)
}

func (r *ResourceManager) Has() int{
    return len(r.tc)
}

func (r *ResourceManager) Left() int{
    return cap(r.tc) - len(r.tc)
}

队列调度器

队列调度器实现获取以及储存请求。

请求的重复性交给下载器来判断(考虑只有下载成功的请求才不需要访问)。

简化的请求为 string 类型的 url 链接。

package scheduler

import (
    "container/list"
    "crypto/md5"
    "sync"
)

type QueueScheduler struct {
    queue    *list.List
    locker   *sync.Mutex
    listkey  map[[md5.Size]byte] *list.Element
}

func NewQueueSCheduler() *QueueScheduler{
    queue   := list.New()
    locker  := new(sync.Mutex)
    listkey := make(map[[md5.Size]byte] *list.Element)

    return &QueueScheduler{
        queue:queue,
        locker:locker,
        listkey:listkey}
}

// Pop - 从队列中获取一个链接
func (s *QueueScheduler) Pop() (string,bool){
    s.locker.Lock()
    if s.queue.Len() <= 0{
        s.locker.Unlock()
        return "",false
    }
    e := s.queue.Front()
    ret := e.Value.(string)
    // 清除listkey中该元素,加入到访问队列中
    key := md5.Sum([]byte(ret))
    delete(s.listkey,key)
    s.queue.Remove(e)
    s.locker.Unlock()
    return ret,true
}

// Push - 将链接放入队列中
func (s *QueueScheduler) Push(url string){
    s.locker.Lock()
    key := md5.Sum([]byte(url))
    // 链接已存在
    if _,ok := s.listkey[key]; ok{
        s.locker.Unlock()
        return
    }
    e := s.queue.PushBack(url)
    s.listkey[key] = e
    s.locker.Unlock()
}

下载器

下载器提供接口下载请求,并返回下载得到的内容。

下载器提供接口判断请求是否已经被处理过。

若下载失败则标记当前请求访问失败,反之标记当前请求访问成功,使用 map 储存。

简化的下载器仅使用的 http 包中的 Get 方法。

package downloader

import (
   "crypto/md5"
   "log"
   "net/http"
   "sync"
)

type HttpDownLoader struct {
   locker *sync.Mutex
   downloaded map[[md5.Size]byte] bool
}

func NewHttpDownLoader() *HttpDownLoader{
   locker := new(sync.Mutex)
   downloaded := make(map[[md5.Size]byte]bool)
   return &HttpDownLoader{
      locker:locker,
      downloaded:downloaded,
   }
}

// 下载链接
func (h *HttpDownLoader) DownLoad(url string) *http.Response{
   key := md5.Sum([]byte(url))
   resp,err := http.Get(url)
   h.locker.Lock()
   // 已经被访问过了,不需要访问。
   if ok,has := h.downloaded[key]; has && ok{
      h.locker.Unlock()
      return nil
   }
   // 访问失败
   if err != nil || resp.StatusCode != http.StatusOK{
      log.Println("[DownLoader] 下载链接失败:" + url)
      h.downloaded[key] = false
      h.locker.Unlock()
      return nil
   }
   h.downloaded[key] = true
   h.locker.Unlock()
   log.Println("[DownLoader] 下载链接成功:" + url)
   return resp
}

// 链接是否被访问
func (h *HttpDownLoader) Visited(url string) bool{
   key := md5.Sum([]byte(url))
   var ret bool
   h.locker.Lock()
   if ok,has := h.downloaded[key]; has && ok{
      ret = true
   }else{
      ret = false
   }
   h.locker.Unlock()
   return ret
}

页面处理

页面处理需要返回链接请求集合,这里简化为 []string 类型。

页面处理需要返回文档,这里直接简化为 goquery 包中的 document

package pageprocess

import (
    "github.com/PuerkitoBio/goquery"
)

type PageProcess struct {
}

func NewPageProcess() PageProcess{
    return PageProcess{}
}

// 返回链接函数
func (p *PageProcess) Process(d *goquery.Document) []string{
    var links []string
  // 获取链接的处理代码
    return links
}

输出

package pipeline

import (
    "github.com/PuerkitoBio/goquery"
    "log"
    "os"
)

type FilePipeLine struct {
    dir string
}

func NewFilePipeLine(dir string) *FilePipeLine{
    return &FilePipeLine{dir:dir}
}

func (p *FilePipeLine) Process(doc *goquery.Document){
    // 文件写入实现
}

参考


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算统计

计算统计

Geof H.Givens、Jennifer A.Hoeting / 王兆军、刘民千、邹长亮、杨建峰 / 人民邮电出版社 / 2009-09-01 / 59.00元

随着计算机的快速发展, 数理统计中许多涉及大计算量的有效方法也得到了广泛应用与迅猛发展, 可以说, 计算统计已是统计中一个很重要的研究方向. 本书既包含一些经典的统计计算方法, 如求解非线性方程组的牛顿方法、传统的随机模拟方法等, 又全面地介绍了近些年来发展起来的某些新方法, 如模拟退火算法、基因算法、EM算法、MCMC方法、Bootstrap方法等, 并通过某些实例, 对这些方法的应用进行......一起来看看 《计算统计》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具