内容简介:将爬虫分为两部分:一、队列调度器:提供下载请求给二、
原文链接
并发版爬虫架构
go_spider.png
将爬虫分为两部分:
一、队列调度器:提供下载请求给 Process
二、 Process
:包括下载请求、解析下载的内容、返回新请求列表给队列调度器、输出下载内容。
具体实现:
Process Process
爬虫引擎
package spider import ( "downloader" "github.com/PuerkitoBio/goquery" "log" "pageprocess" "pipeline" "scheduler" "strconv" "time" ) // threadnum - 线程数量 // scheduler - 调度器 // downloader - 下载器 // pageprocess - 页面处理 // pipeline - 输出 type Spider struct { threadnum uint8 scheduler scheduler.Scheduler downloader downloader.DownLoader pageprocess pageprocess.PageProcess pipeline pipeline.PipeLine } // NewSpider 创建一个爬虫引擎 func NewSpider(threadnum int,path string) *Spider{ return &Spider{ scheduler:scheduler.NewQueueSCheduler(), downloader:downloader.NewHttpDownLoader(), pageprocess:pageprocess.NewPageProcess(), pipeline:pipeline.NewFilePipeLine(path), threadnum:uint8(threadnum), } } // Run 引擎运行 func (s *Spider) Run(){ // Process并发数量 rm := NewResourceManagerChan(s.threadnum) log.Println("[Spider] 爬虫运行 - 处理线程数:" + strconv.Itoa(rm.Cap())) for{ url,ok := s.scheduler.Pop() // 爬取队列为空 并且 没有Process线程在处理 认为爬虫结束 if ok == false && rm.Has() == 0{ log.Println("[Spider] 爬虫运行结束") break }else if ok == false{ // Process线程正在处理,可能还会有新的请求加入调度 log.Println("[Spider] 爬取队列为空 - 等待处理") time.Sleep(500 * time.Millisecond) continue } // 控制Process线程并发数量 rm.GetOne() go func(url string) { defer rm.FreeOne() s.Process(url) }(url) } } // 添加请求链接 func (s *Spider) AddUrl(url string) *Spider{ s.scheduler.Push(url) return s } func (s *Spider) AddUrls(urls []string) *Spider{ for _,url := range urls{ s.scheduler.Push(url) } return s } // 处理请求链接 func (s *Spider) Process(url string){ // 下载链接 resp := s.downloader.DownLoad(url) if resp == nil{ /*下载失败重新加入调度队列中*/ if !s.downloader.Visited(url){ s.scheduler.Push(url) } return } // 页面处理 - 使用goquery包简单处理 doc,err := goquery.NewDocumentFromReader(resp.Body) if err != nil{ log.Println("[Process] 解析错误") s.scheduler.Push(url) return } // 将新请求链接加入到调度器中 links := s.pageprocess.Process(doc) for _,url := range links{ if !s.downloader.Visited(url){ s.scheduler.Push(url) } } // 输出文档 go s.pipeline.Process(doc) }
// 控制线程并发数 package spider type ResourceManager struct { tc chan uint8 } func NewResourceManagerChan(num uint8) *ResourceManager{ tc := make(chan uint8,num) return &ResourceManager{tc:tc} } func (r *ResourceManager) GetOne(){ r.tc <- 1 } func (r *ResourceManager) FreeOne(){ <- r.tc } func (r *ResourceManager) Cap() int{ return cap(r.tc) } func (r *ResourceManager) Has() int{ return len(r.tc) } func (r *ResourceManager) Left() int{ return cap(r.tc) - len(r.tc) }
队列调度器
队列调度器实现获取以及储存请求。
请求的重复性交给下载器来判断(考虑只有下载成功的请求才不需要访问)。
简化的请求为 string
类型的 url
链接。
package scheduler import ( "container/list" "crypto/md5" "sync" ) type QueueScheduler struct { queue *list.List locker *sync.Mutex listkey map[[md5.Size]byte] *list.Element } func NewQueueSCheduler() *QueueScheduler{ queue := list.New() locker := new(sync.Mutex) listkey := make(map[[md5.Size]byte] *list.Element) return &QueueScheduler{ queue:queue, locker:locker, listkey:listkey} } // Pop - 从队列中获取一个链接 func (s *QueueScheduler) Pop() (string,bool){ s.locker.Lock() if s.queue.Len() <= 0{ s.locker.Unlock() return "",false } e := s.queue.Front() ret := e.Value.(string) // 清除listkey中该元素,加入到访问队列中 key := md5.Sum([]byte(ret)) delete(s.listkey,key) s.queue.Remove(e) s.locker.Unlock() return ret,true } // Push - 将链接放入队列中 func (s *QueueScheduler) Push(url string){ s.locker.Lock() key := md5.Sum([]byte(url)) // 链接已存在 if _,ok := s.listkey[key]; ok{ s.locker.Unlock() return } e := s.queue.PushBack(url) s.listkey[key] = e s.locker.Unlock() }
下载器
下载器提供接口下载请求,并返回下载得到的内容。
下载器提供接口判断请求是否已经被处理过。
若下载失败则标记当前请求访问失败,反之标记当前请求访问成功,使用 map
储存。
简化的下载器仅使用的 http
包中的 Get
方法。
package downloader import ( "crypto/md5" "log" "net/http" "sync" ) type HttpDownLoader struct { locker *sync.Mutex downloaded map[[md5.Size]byte] bool } func NewHttpDownLoader() *HttpDownLoader{ locker := new(sync.Mutex) downloaded := make(map[[md5.Size]byte]bool) return &HttpDownLoader{ locker:locker, downloaded:downloaded, } } // 下载链接 func (h *HttpDownLoader) DownLoad(url string) *http.Response{ key := md5.Sum([]byte(url)) resp,err := http.Get(url) h.locker.Lock() // 已经被访问过了,不需要访问。 if ok,has := h.downloaded[key]; has && ok{ h.locker.Unlock() return nil } // 访问失败 if err != nil || resp.StatusCode != http.StatusOK{ log.Println("[DownLoader] 下载链接失败:" + url) h.downloaded[key] = false h.locker.Unlock() return nil } h.downloaded[key] = true h.locker.Unlock() log.Println("[DownLoader] 下载链接成功:" + url) return resp } // 链接是否被访问 func (h *HttpDownLoader) Visited(url string) bool{ key := md5.Sum([]byte(url)) var ret bool h.locker.Lock() if ok,has := h.downloaded[key]; has && ok{ ret = true }else{ ret = false } h.locker.Unlock() return ret }
页面处理
页面处理需要返回链接请求集合,这里简化为 []string
类型。
页面处理需要返回文档,这里直接简化为 goquery
包中的 document
。
package pageprocess import ( "github.com/PuerkitoBio/goquery" ) type PageProcess struct { } func NewPageProcess() PageProcess{ return PageProcess{} } // 返回链接函数 func (p *PageProcess) Process(d *goquery.Document) []string{ var links []string // 获取链接的处理代码 return links }
输出
package pipeline import ( "github.com/PuerkitoBio/goquery" "log" "os" ) type FilePipeLine struct { dir string } func NewFilePipeLine(dir string) *FilePipeLine{ return &FilePipeLine{dir:dir} } func (p *FilePipeLine) Process(doc *goquery.Document){ // 文件写入实现 }
参考
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Go语言项目实战:并发爬虫
- golang练手小项目系列(2)-并发爬虫
- Golang 并发爬虫 爬取某著名游戏媒体
- Golang实现简单爬虫框架(3)——简单并发版
- 50行代码实现一个并发的 Python 爬虫程序
- Golang实现简单爬虫框架(4)——队列实现并发任务调度
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
未来世界的幸存者
阮一峰 / 人民邮电出版社 / 2018-6-1 / 39.00 元
本书为阮一峰博客文集,主要收录的是作者对技术变革的影响的一些思考,希望能够藉此书让读者意识到世界正在剧烈变化,洪水就在不远处,从而早早准备出路。本书适合所有乐于思考的读者。一起来看看 《未来世界的幸存者》 这本书的介绍吧!