内容简介:在上一篇文章注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载通过分析我们发现,两种不同调度的区别是每个
前言
在上一篇文章 《Golang实现简单爬虫框架(4)——队列实现并发任务调度》 中,我们使用用队列实现了任务调度,接下来首先对两种并发方式做一个同构,使代码统一。然后添加数据存储模块。
注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载 项目源代码查看
1、项目重构
(1)并发引擎
通过分析我们发现,两种不同调度的区别是每个 worker
一个 channel
还是 所有 worker
共用一个 channel
,所以我们在接口中定义一个函数 WorkerChan()
,用来决定这件事,即 worker
一个 channel
还是 所有 worker
共用一个 channel
。此时 ConfigMasterWorkerChan
就不再需要了。
在项目文件concurrent.go中我们定义一个任务调度器Scheduler,如下:
// 任务调度器 type Scheduler interface { Submit(request Request) // 提交任务 ConfigMasterWorkerChan(chan Request) WorkerReady(w chan Request) Run() }
但是在简单并发中我们只实现了 Submit
和 ConfigMasterWorkerChan
接口,而使用队列调度中却实现了接口的所有方法,所有我们同构一下使 concurrent.go
文件可以适用于两种不同的调度器。
因为在 createworker
函数中要使用 WorkerReady
函数,所以要传入一个 Scheduler
,但是这样显得比较重,我们可以利用接口组合,新建一个接口 ReadyNotifier
,这样在 createworker
函数中传入 ReadyNotifier
即可。
修改后的任务调度如下:
type Scheduler interface { ReadyNotifier Submit(request Request) // 提交任务 WorkerChan() chan Request Run() } type ReadyNotifier interface { WorkerReady(chan Request) }
此时创建goroutine修改如下:
// 创建 goroutine for i := 0; i < e.WorkerCount; i++ { //任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定 createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler) }
修改后的concurrent.go文件如下:
package engine import ( "log" ) // 并发引擎 type ConcurrendEngine struct { Scheduler Scheduler WorkerCount int } // 任务调度器 type Scheduler interface { ReadyNotifier Submit(request Request) // 提交任务 WorkerChan() chan Request Run() } type ReadyNotifier interface { WorkerReady(chan Request) } func (e *ConcurrendEngine) Run(seeds ...Request) { out := make(chan ParseResult) e.Scheduler.Run() // 创建 goruntine for i := 0; i < e.WorkerCount; i++ { // 任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定 createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler) } // engine把请求任务提交给 Scheduler for _, request := range seeds { e.Scheduler.Submit(request) } itemCount := 0 for { // 接受 Worker 的解析结果 result := <-out for _, item := range result.Items { log.Printf("Got item: #%d: %v\n", itemCount, item) itemCount++ } // 然后把 Worker 解析出的 Request 送给 Scheduler for _, request := range result.Requests { e.Scheduler.Submit(request) } } } func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) { go func() { for { ready.WorkerReady(in) // 告诉调度器任务空闲 request := <-in result, err := worker(request) if err != nil { continue } out <- result } }() }
(2)简单并发调度器
scheduler/simple.go
package scheduler import "crawler/engine" type SimpleScheduler struct { workerChan chan engine.Request } func (s *SimpleScheduler) WorkerChan() chan engine.Request { // 此时所有 worker 共用同一个 channel,直接返回即可 return s.workerChan } func (s *SimpleScheduler) WorkerReady(w chan engine.Request) { } func (s *SimpleScheduler) Run() { // 创建出 workchannel s.workerChan = make(chan engine.Request) } func (s *SimpleScheduler) Submit(request engine.Request) { // send request down to worker chan go func() { s.workerChan <- request }() }
(3)队列实现调度器
scheduler/queued.go
添加 WorkerChan()
的实现即可
package scheduler import "crawler/engine" // 使用队列来调度任务 type QueuedScheduler struct { requestChan chan engine.Request workerChan chan chan engine.Request } func (s *QueuedScheduler) WorkerChan() chan engine.Request { // 对于队列实现来讲,每个 worker 共用一个 channel return make(chan engine.Request) } // 提交请求任务到 requestChan func (s *QueuedScheduler) Submit(request engine.Request) { s.requestChan <- request } // 告诉外界有一个 worker 可以接收 request func (s *QueuedScheduler) WorkerReady(w chan engine.Request) { s.workerChan <- w } func (s *QueuedScheduler) Run() { s.workerChan = make(chan chan engine.Request) s.requestChan = make(chan engine.Request) go func() { // 创建请求队列和工作队列 var requestQ []engine.Request var workerQ []chan engine.Request for { var activeWorker chan engine.Request var activeRequest engine.Request if len(requestQ) > 0 && len(workerQ) > 0 { activeWorker = workerQ[0] activeRequest = requestQ[0] } select { case r := <-s.requestChan: // 当 requestChan 收到数据 requestQ = append(requestQ, r) case w := <-s.workerChan: // 当 workerChan 收到数据 workerQ = append(workerQ, w) case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务 requestQ = requestQ[1:] workerQ = workerQ[1:] } } }() }
(4)main函数
经过上述同构,在main函数中如需切换不同调度器,只需要相应的配置即可。
package main import ( "crawler/engine" "crawler/scheduler" "crawler/zhenai/parser" ) func main() { e := engine.ConcurrendEngine{ //Scheduler: &scheduler.QueuedScheduler{}, // 队列实现调度器 Scheduler: &scheduler.SimpleScheduler{}, // 简单并发调度 WorkerCount: 50, } e.Run(engine.Request{ Url: "http://www.zhenai.com/zhenghun", ParseFunc: parser.ParseCityList, }) }
2、数据存储
(1)Mgo的介绍安装
爬取到的数据不能仅仅在控制台打印出来,所以我们还要给爬虫添加数据存储模块。我们本次选择使用 mongodb 来存储我们的数据。
mgo(音mango)是 MongoDB 的 Go语言 驱动,它用基于 Go 语法的简单API实现了丰富的特性,并经过良好测试。
官方网址: http://labix.org/mgo
文档: API docs for mgo
首先我们要安装mgo,打开终端,输入下面代码完成安装
go get gopkg.in/mgo.v2
mgo基本操作都很简单,有数据库操作经验都可以很快上手。
(2)爬虫引擎与数据格式
首先,爬虫引擎获取到数据要把数据发送给数据存储模块,而数据的传递用要用到 channel
,所以打开 concurrent.go
文件,在引擎添加 ItemChan
属性,如下所示:
爬取到数据需要把数据发送到数据存储模块,
package engine // 并发引擎 type ConcurrendEngine struct { Scheduler Scheduler // 任务调度器 WorkerCount int // 并发任务数量 ItemChan chan Item // 数据保存 channel } // ... for { // 接受 Worker 的解析结果 result := <-out for _, item := range result.Items { // 当抓取一组数据后,进行保存 go func(item2 Item) { e.ItemChan <- item2 }(item) } // ... } // ...
在 engine/types.go
中定义Item类型:
package engine // 请求结构 type Request struct { Url string // 请求地址 ParseFunc func([]byte) ParseResult } // 解析结果结构 type ParseResult struct { Requests []Request // 解析出的请求 Items []Item // 解析出的内容 } // 解析出的用户数据格式 type Item struct { Url string // 个人信息Url地址 Type string // table Id string // Id Payload interface{} // 详细信息 } func NilParseFun([]byte) ParseResult { return ParseResult{} }
(3)存储模块的实现
在根目录下创建persist文件夹,然后创建itemsaver.go文件
// persist/itemsaver.go package persist import ( "context" "crawler/engine" "errors" "gopkg.in/mgo.v2" "gopkg.in/olivere/elastic.v5" "log" ) func ItemSaver(index string) (chan engine.Item, error) { // mongodb connect session, err := mgo.Dial("localhost:27017") if err != nil { panic(err) } out := make(chan engine.Item) go func() { itemCount := 0 for { // 接收到发送的 item item := <-out log.Printf("Item Saver: got item #%d: %v\n", itemCount, item) itemCount++ // Save data in mongodb err := mongo_save(session, index, item) if err != nil { // if have err, ignore it log.Printf("Item Saver: error, saving item %v: %v", item, err) } } }() return out, nil } // 使用 MongoDB 保存数据 func mongo_save(session *mgo.Session, dbName string, item engine.Item) error { if item.Type == "" { return errors.New("must supply Type") } c := session.DB(dbName).C(item.Type) // 选择要操作的数据库与集合 err := c.Insert(item) // 插入数据 if err != nil { log.Fatal(err) } return nil }
(4)存储测试文件
我们把一条数据存入mongodb,然后再取出来,比对读出的数据和写入的数据是否相同
// persist/itemsaver_test.gp package persist import ( "crawler/engine" "crawler/model" "encoding/json" "fmt" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "log" "testing" ) func TestMongoSave(t *testing.T) { // mongodb connect session, err := mgo.Dial("localhost:27017") if err != nil { panic(err) } expected := engine.Item{ Url: "http://album.zhenai.com/u/1946858930", Type: "zhenai", Id: "1946858930", Payload: model.Profile{ Name: "為你垨候", Gender: "女士", Age: 40, Height: 163, Weight: 54, Income: "5-8千", Marriage: "未婚", Address: "佛山顺德区", }, } // 保存数据 err = mongo_save(session, "crawler", expected) if err != nil { panic(err) } c := session.DB("crawler").C("zhenai") var result engine.Item // 查询数据 err = c.Find(bson.M{"id": "1946858930"}).One(&result) // result 为 Json 类型 if err != nil { log.Fatal(err) } fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload) }
(5)parser模块
我们要在 parse/profile.go
文件中组装好需要保存到数据库的数据格式
// ... result := engine.ParseResult{ Items: []engine.Item{ { Url: url, Type: "zhenai", Id: extractString([]byte(url), idUrlRe), Payload: profile, }, }, } // ...
(6)main函数
package main import ( "crawler/engine" "crawler/persist" "crawler/scheduler" "crawler/zhenai/parser" ) func main() { itemChan, err := persist.ItemSaver() if err != nil { panic(err) } e := engine.ConcurrendEngine{ //Scheduler: &scheduler.QueuedScheduler{}, Scheduler: &scheduler.SimpleScheduler{}, WorkerCount: 100, ItemChan: itemChan, } e.Run(engine.Request{ Url: "http://www.zhenai.com/zhenghun", ParseFunc: parser.ParseCityList, }) }
运行项目,打开mongodb可视化工具,可以看到爬取了54410条数据
3、总结
我们首先把两种并发方式做一个同构,使代码统一,直接在main函数中使用不同的配置就可以切换调度器,简单方便。然后使用Mgo驱动操作数据,添加到mongodb中。内容有点多,很多代码没有完整的展示出来,希望大家可以下载 项目源代码 ,回滚到对应提交记录查看,效果会更好。 别无所求,只求随手给个star
下篇博客中我们会再当前博客的基础上添加数据展示功能
如果想获取 Google工程师深度讲解go语言 视频资源的,可以在评论区留下邮箱。
如果觉得文章还可以,劳烦大人随手点个赞。。。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。