package main
import (
"io/ioutil"
"golang.org/x/net/html/charset"
"golang.org/x/text/encoding"
"bufio"
"golang.org/x/text/transform"
"golang.org/x/text/encoding/unicode"
"log"
"regexp"
"strconv"
"net/http"
"fmt"
"time"
"gopkg.in/olivere/elastic.v5"
"context"
"net/rpc/jsonrpc"
"net/rpc"
"net"
)
var rateLimiter = time.Tick(10 * time.Millisecond)
/* start Fetch.go*/
func Fetch(url string)([]byte ,error){
//resp,err:= http.Get(url)
//
//if err!=nil{
// return nil,err
//}
//
//defer resp.Body.Close()
//if resp.StatusCode != http.StatusOK{
// return nil,fmt.Errorf("Error: status code:%d",resp.StatusCode)
//}
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
fmt.Printf("get url:%s error",url )
return nil,fmt.Errorf("get url:%d error",url)
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
//log.Fatalln(err)
fmt.Printf("get url:%s error",url )
return nil,fmt.Errorf("get url:%d error",url)
}
defer resp.Body.Close()
bodyReader:= bufio.NewReader(resp.Body)
e:= determineEncoding(bodyReader)
utf8reader:= transform.NewReader(bodyReader,e.NewDecoder())
return ioutil.ReadAll(utf8reader)
}
func determineEncoding(r *bufio.Reader) encoding.Encoding{
bytes,err := bufio.NewReader(r).Peek(1024)
if err !=nil{
log.Printf("Fetcher error:%v",err)
return unicode.UTF8
}
e,_,_:= charset.DetermineEncoding(bytes,"")
return e
}
/* end Fetch.go*/
/* start Type.go*/
type Request struct{
Url string
ParserFunc func([]byte) ParseResult
}
type ParseResult struct{
Requests []Request
Items []Item
}
type Item struct{
Url string
Id string
Type string
Payload interface{}
}
func NilParser([]byte) ParseResult{
return ParseResult{}
}
/* end Type.go*/
/* start parser/city.go 爬取城市下每一个用户和网址*/
var cityRe = regexp.MustCompile(`<a href="(http://album.zhenai.com/u/[\d]+)" target="_blank">([^<]+)</a>`)
var cityUrlRe = regexp.MustCompile(`<a href="(http://www.zhenai.com/zhenghun/[^"]+)"`)
func ParseCity(contents []byte) ParseResult{
matches:= cityRe.FindAllSubmatch(contents,-1)
result := ParseResult{}
for _,m:= range matches{
url:=string(m[1])
name:=string(m[2])
//println(string(m[1]))
//不用用户名了
//result.Items = append(result.Items,"User:"+string(m[2]))
result.Requests = append(result.Requests,Request{
Url:string(m[1]),
ParserFunc:func(c []byte) ParseResult{
return PaesrProfile(
c,url,name)
},
})
}
//查找城市页面下的城市链接
matches= cityUrlRe.FindAllSubmatch(contents,-1)
for _,m:= range matches{
result.Requests = append(result.Requests,Request{
Url:string(m[1]),
ParserFunc:ParseCity,
})
}
return result
}
/* end parser/city.go */
/* start parser/citylist.go */
const cityListRe = `(http://www.zhenai.com/zhenghun/[0-9a-z]+)"[^>]*>([^<]+)</a>`
func ParseCityList(contents []byte) ParseResult{
re:=regexp.MustCompile(cityListRe)
matches:= re.FindAllSubmatch(contents,-1)
result:=ParseResult{}
for _,m :=range matches{
//result.Items = append(result.Items,string(m[2]))
result.Requests = append(
result.Requests,Request{
Url:string(m[1]),
ParserFunc:ParseCity,
})
}
return result
}
/* end parser/citylist.go */
/* start profile.go */
type Profile struct {
Name string
Age int
Marry string
Constellation string
Height int
Weight int
Salary string
}
func (p Profile) String() string{
return p.Name +" " + p.Marry + strconv.Itoa(p.Age) +"olds "+ strconv.Itoa(p.Height) + "cm " + strconv.Itoa(p.Weight)+ "kg "
}
/* end profile.go */
/* start parser/profile.go */
var ageRe = regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)岁</div>`)
var marry = regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>(已婚)</div>`)
var constellation = regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>(天秤座)</div>`)
var height =regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)cm</div>`)
var weight =regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)kg</div>`)
var salary = regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>月收入:([^<]+)</div>`)
var idRe = regexp.MustCompile(`http://album.zhenai.com/u/([\d]+)`)
//解析器 解析用户
//name为上一级传递过来的
func PaesrProfile(contents []byte,url string,name string) ParseResult{
//ioutil.WriteFile("test.html",contents,0x777)
//用户结构体
profile:=Profile{}
profile.Name = name
//年龄 string转换为int
age,err:= strconv.Atoi(extractString(contents,ageRe))
if err==nil{
profile.Age = age
}
//身高
height,err:= strconv.Atoi(extractString(contents,height))
if err==nil{
profile.Height = height
}
//体重
weight,err:= strconv.Atoi(extractString(contents,weight))
if err==nil{
profile.Weight = weight
}
//薪水
profile.Salary = extractString(contents,salary)
//星座
profile.Constellation = extractString(contents,constellation)
if extractString(contents,marry)== ""{
profile.Marry ="未婚"
}else{
profile.Marry ="已婚"
}
result:=ParseResult{
Items:[]Item{
{
Url:url,
Type:"zhenai",
Id:extractString([]byte(url),idRe),
Payload:profile,
},
},
}
return result
}
//封装 正则表达式匹配
func extractString(contents []byte,re *regexp.Regexp) string{
match:=re.FindSubmatch(contents)
if len(match)>=2{
return string(match[1])
}else{
return ""
}
}
/* end parser/profile.go */
/* start engine.go 单任务版引擎*/
func Run(seeds ...Request){
var requests []Request
for _,r := range seeds{
requests = append(requests,r)
}
for len(requests) >0{
r:=requests[0]
requests = requests[1:]
//fmt.Printf("Fetching %s",r.Url)
body,err:= Fetch(r.Url)
if err!=nil{
log.Printf("Fetcher:error "+ "fetching url %s, : %v",r.Url,err)
continue
}
parseResult:= r.ParserFunc(body)
requests = append(requests,parseResult.Requests...)
for _,item:= range parseResult.Items{
fmt.Printf("Got item %s\n",item)
}
}
}
//具体的工作 传递一个request,通过解析器对url进行解析
func worker(r Request)(ParseResult,error){
body,err:= Fetch(r.Url)
if err!=nil{
log.Printf("Fetcher:error "+ "fetching url %s, : %v",r.Url,err)
return ParseResult{},err
}
return r.ParserFunc(body),nil
}
/* start itemchan.go 存储服务器*/
func ItemSaver() (chan Item,error){
client,err := elastic.NewClient(
elastic.SetSniff(false))
if err !=nil{
return nil ,err
}
out:=make(chan Item)
go func(){
itemCount:=0
for{
item:=<-out
//fmt.Printf("Item Saver: error saveing item %d,%v",itemCount,item)
log.Printf("Item Saversaveing item %d,%v",itemCount,item)
itemCount++
//存储到elastiicsearch
err:= Save(client,item)
if err!=nil{
log.Print("Item Saver: error" + "saveing item %v,%v",item,err)
}
}
}()
return out,nil
}
func Save(client * elastic.Client,item Item) error{
indexServer:= client.Index().Index("dating_profile").Type(item.Type).BodyJson(item)
if item.Id!=""{
indexServer.Id(item.Id)
}
_,err := indexServer.Do(context.Background())
if err !=nil{
panic(err)
}
return nil
}
/* start itemchan.go 存储服务器*/
// 并发版爬虫引擎 包含了调度器 与 工人数
type ConcurrentEngine struct{
Scheduler Scheduler
WorkerCount int
//增加存储通道,有item需要存储,即发送消息到此通道,触发存储操作。
ItemChan chan Item
}
//调度器是一个接口,扩展性
type Scheduler interface {
//提交Request进行执行
Submit(Request)
WorkerChan() chan Request
WorkerReady(chan Request)
Run()
}
//并发版爬虫引擎
func (e *ConcurrentEngine) Run(seeds ...Request){
out:= make(chan ParseResult)
//配置调度器通道
e.Scheduler.Run()
//开启WorkerCount个工作
for i:=0;i<e.WorkerCount;i++{
createWorker(e.Scheduler.WorkerChan(),out,e.Scheduler)
}
//种子首先运行
for _,r:=range seeds{
e.Scheduler.Submit(r)
}
for{
//out等待接受ParseResult
result:=<-out
//打印出接收到的数据,以及个数。
for _,item:= range result.Items{
go func(){e.ItemChan <-item}()
}
//分配任务
for _,request:= range result.Requests{
if isDuplicate(request.Url){
continue
}
e.Scheduler.Submit(request)
}
}
}
//存储URL、实行去掉重复URL的操作
var URLstore = make(map[string]bool)
func isDuplicate(url string) bool{
if URLstore[url]{
return true
}
URLstore[url] = true
return false
}
//工作函数,逻辑是 in通道接收到request,即会调用worker函数爬每一个request中的网址,用对应的解析器。 解析完成后,将ParseResult返回给通道out
func createWorker(in chan Request,out chan ParseResult,s Scheduler) {
go func(){
for{
//传递到调度器,提示可以开始工作
s.WorkerReady(in)
//有任务到工作中
request := <-in
//开始工作
result,err:= worker(request)
if err!=nil{
continue
}
//工作结果返回
out <-result
}
}()
}
/* end engine.go */
/* start scheduler.go 简单版调度器,用于分配工作任务 */
type SimpleScheduler struct{
//通道
workerChan chan Request
}
func (s *SimpleScheduler) WorkerChan() chan Request {
return s.workerChan
}
func (s *SimpleScheduler) WorkerReady(chan Request) {
}
func (s *SimpleScheduler) Run() {
s.workerChan = make(chan Request)
}
func ( s *SimpleScheduler) Submit( r Request) {
//为了防止死锁,在调度器中建立 go 的协程 分配任务到通道中。
go func(){s.workerChan <- r}()
}
/* end scheduler.go */
/* start Queuescheduler.go 队列调度器,用于分配工作任务 */
type QueuedScheduler struct {
requestChan chan Request
workerChan chan chan Request
}
func (s *QueuedScheduler) WorkerChan() chan Request {
return make(chan Request)
}
//提交任务到通道,说明需要完成任务
func (s *QueuedScheduler) Submit(r Request) {
s.requestChan <-r
}
//提交工作到通道,说明准备好工作了
func (s *QueuedScheduler) WorkerReady(w chan Request){
s.workerChan <- w
}
func (s * QueuedScheduler) Run(){
s.workerChan =make(chan chan Request)
s.requestChan = make(chan Request)
go func(){
//任务队列
var requestQ []Request
//工作队列
var workQ []chan Request
for{
var activeRequest Request
var activework chan Request
//即有工作又有任务,开始工作
if len(requestQ)>0 && len(workQ) >0{
activework = workQ[0]
activeRequest = requestQ[0]
}
select {
//任务增加,添加到队列中
case r:=<-s.requestChan:
requestQ = append(requestQ,r)
//工作增加,添加到队列中
case w:= <-s.workerChan:
workQ = append(workQ,w)
//有工作又有任务,让工作去做任务
case activework <- activeRequest:
workQ = workQ[1:]
requestQ=requestQ[1:]
}
}
}()
}
//分布式存储服务器
func ItemSaver2(host string) (chan Item,error){
client,err:=Newclient(host)
out:=make(chan Item)
if err !=nil{
return nil,err
}
go func(){
itemCount:=0
for{
item:=<-out
//fmt.Printf("Item Saver: error saveing item %d,%v",itemCount,item)
log.Printf("Item Saversaveing item %d,%v",itemCount,item)
itemCount++
result:=""
//Call RPC to save item
client.Call("ItemSaverService.Save",item,&result)
if err!=nil{
log.Print("Item Saver: error" + "saveing item %v,%v",item,err)
}
}
}()
return out,nil
}
//抽象出的jsonrpc server
func ServerRpc(host string,service interface{}) error{
rpc.Register(service)
listener,err:=net.Listen("tcp",host)
if err!=nil{
return err
}
for{
conn,err:=listener.Accept()
if err!=nil{
log.Printf("accept error:%v",err)
continue
}
go jsonrpc.ServeConn(conn)
}
return nil
}
//抽象出的jsonrpc client
func Newclient(host string)(*rpc.Client,error){
conn,err:=net.Dial("tcp",host)
if err!=nil{
return nil,err
}
client:= jsonrpc.NewClient(conn)
return client,nil
}
type ItemSaverService struct{
Client *elastic.Client
}
func (s*ItemSaverService) Save(item Item,result*string) error{
err:= Save(s.Client,item)
log.Printf("Item %v saved.",item)
if err==nil{
*result = "ok"
}else{
log.Printf("Error save Item %v %v.",item,err)
}
return err
}
/* end Queuescheduler.go 队列调度器,用于分配工作任务 */
func main(){
//并发版爬虫+
itemchan,err := ItemSaver2(":1234")
if err!=nil{
log.Panic(err)
}
e:= ConcurrentEngine{
Scheduler:&QueuedScheduler{},
WorkerCount:100,
//增加存储通道
ItemChan:itemchan,
}
e.Run(Request{
Url:"http://www.zhenai.com/zhenghun",
ParserFunc:ParseCityList,
})
//并发调度版爬虫
//e:= ConcurrentEngine{
// Scheduler:&QueuedScheduler{},
// WorkerCount:100,
//}
//
//e.Run(Request{
// Url:"http://www.zhenai.com/zhenghun",
// ParserFunc:ParseCityList,
//})
//
////并发版爬虫
//e:= ConcurrentEngine{
// Scheduler:&SimpleScheduler{},
// WorkerCount:100,
//}
//
//e.Run(Request{
// Url:"http://www.zhenai.com/zhenghun",
// ParserFunc:ParseCityList,
//})
//单任务版爬虫
//Run(Request{
// Url:"http://www.zhenai.com/zhenghun",
// ParserFunc:ParseCityList,
//})
//paseTest()
}
RPC服务器
func main() {
client, err := elastic.NewClient(elastic.SetSniff(false))
if err != nil {
panic(err)
}
ServerRpc(":1234", &ItemSaverService{
Client: client,
})
}
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- python爬虫 | 一文搞懂分布式进程爬虫
- 装个虚拟机,然后拿来玩爬虫!也是极好的!Scrapy分布式爬虫!
- 分布式爬虫对新站的协助
- 分布式通用爬虫框架Crawlab
- 如何构建一个分布式爬虫:基础篇
- 基于redis的分布式爬虫实现方案
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Scalable Internet Architectures
Theo Schlossnagle / Sams Publishing / 2006-7-31 / USD 49.99
As a developer, you are aware of the increasing concern amongst developers and site architects that websites be able to handle the vast number of visitors that flood the Internet on a daily basis. Sc......一起来看看 《Scalable Internet Architectures》 这本书的介绍吧!