golang[59]-分布式爬虫-jsonrpc抽离存储数据引擎

栏目: 服务器 · 发布时间: 5年前

package main

import (
	"io/ioutil"
	"golang.org/x/net/html/charset"
	"golang.org/x/text/encoding"
	"bufio"
	"golang.org/x/text/transform"
	"golang.org/x/text/encoding/unicode"
	"log"
	"regexp"
	"strconv"
	"net/http"
	"fmt"
	"time"
	"gopkg.in/olivere/elastic.v5"
	 "context"
	"net/rpc/jsonrpc"
	"net/rpc"
	"net"
)


var rateLimiter = time.Tick(10 * time.Millisecond)
/* start Fetch.go*/
func Fetch(url string)([]byte ,error){
	//resp,err:= http.Get(url)
	//
	//if err!=nil{
	//	return nil,err
	//}
	//
	//defer resp.Body.Close()
	//if resp.StatusCode != http.StatusOK{
	//	return nil,fmt.Errorf("Error: status code:%d",resp.StatusCode)
	//}

	client := &http.Client{}
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		fmt.Printf("get url:%s error",url )
		return nil,fmt.Errorf("get url:%d error",url)


	}
	req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36")

	resp, err := client.Do(req)
	if err != nil {

		//log.Fatalln(err)
		fmt.Printf("get url:%s error",url )
		return nil,fmt.Errorf("get url:%d error",url)
	}

	defer resp.Body.Close()

	bodyReader:= bufio.NewReader(resp.Body)
	e:= determineEncoding(bodyReader)
	utf8reader:= transform.NewReader(bodyReader,e.NewDecoder())

	return ioutil.ReadAll(utf8reader)
}


func determineEncoding(r *bufio.Reader) encoding.Encoding{

	bytes,err := bufio.NewReader(r).Peek(1024)
	if err !=nil{
		log.Printf("Fetcher error:%v",err)
		return unicode.UTF8
	}
	e,_,_:= charset.DetermineEncoding(bytes,"")
	return e
}
/* end  Fetch.go*/
/* start Type.go*/
type Request struct{
	Url string
	ParserFunc func([]byte) ParseResult
}

type ParseResult struct{
	Requests []Request
	Items []Item
}

type Item struct{
	Url string
	Id string
	Type string
	Payload interface{}
}


func NilParser([]byte) ParseResult{
	return ParseResult{}
}

/* end Type.go*/


/* start parser/city.go  爬取城市下每一个用户和网址*/


var cityRe = regexp.MustCompile(`<a href="(http://album.zhenai.com/u/[\d]+)" target="_blank">([^<]+)</a>`)
var cityUrlRe = regexp.MustCompile(`<a href="(http://www.zhenai.com/zhenghun/[^"]+)"`)


func ParseCity(contents []byte) ParseResult{

	matches:= cityRe.FindAllSubmatch(contents,-1)

	result := ParseResult{}
	for _,m:= range matches{

		url:=string(m[1])
		name:=string(m[2])
		//println(string(m[1]))
		//不用用户名了
		//result.Items = append(result.Items,"User:"+string(m[2]))
		result.Requests = append(result.Requests,Request{
			Url:string(m[1]),
			ParserFunc:func(c []byte) ParseResult{
				return PaesrProfile(
					c,url,name)
			},
		})
		}

	//查找城市页面下的城市链接
	matches=  cityUrlRe.FindAllSubmatch(contents,-1)

	for _,m:= range matches{
		result.Requests = append(result.Requests,Request{
			Url:string(m[1]),
			ParserFunc:ParseCity,
			})
		}


	return result

}



/* end parser/city.go */

/* start parser/citylist.go */

const cityListRe = `(http://www.zhenai.com/zhenghun/[0-9a-z]+)"[^>]*>([^<]+)</a>`
func ParseCityList(contents []byte) ParseResult{
	re:=regexp.MustCompile(cityListRe)

	matches:= re.FindAllSubmatch(contents,-1)
	result:=ParseResult{}

	for _,m :=range matches{
			//result.Items = append(result.Items,string(m[2]))
			result.Requests  = append(
				result.Requests,Request{
					Url:string(m[1]),
					ParserFunc:ParseCity,
				})


	}






	return result
}

/* end parser/citylist.go */

/* start profile.go */
type Profile struct {
	Name string
	Age int
	Marry string
	Constellation string
	Height int
	Weight int
	Salary string
}


func (p Profile) String() string{
	return  p.Name +" " + p.Marry + strconv.Itoa(p.Age) +"olds "+   strconv.Itoa(p.Height) + "cm " +  strconv.Itoa(p.Weight)+ "kg "
}
/* end profile.go */

/* start parser/profile.go */


var ageRe = regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)岁</div>`)
var marry =   regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>(已婚)</div>`)
var constellation =   regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>(天秤座)</div>`)
var height  =regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)cm</div>`)
var weight =regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)kg</div>`)
var salary = 	regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>月收入:([^<]+)</div>`)

var idRe =  regexp.MustCompile(`http://album.zhenai.com/u/([\d]+)`)

//解析器 解析用户
//name为上一级传递过来的
func PaesrProfile(contents []byte,url string,name string) ParseResult{

	//ioutil.WriteFile("test.html",contents,0x777)
	//用户结构体
	profile:=Profile{}
	profile.Name = name

	//年龄   string转换为int
		age,err:= strconv.Atoi(extractString(contents,ageRe))
		if err==nil{
			profile.Age = age
		}
		//身高
	height,err:= strconv.Atoi(extractString(contents,height))
	if err==nil{
		profile.Height = height
	}
	//体重
	weight,err:= strconv.Atoi(extractString(contents,weight))
	if err==nil{
		profile.Weight = weight
	}

		//薪水
	profile.Salary = extractString(contents,salary)

	//星座
	profile.Constellation = extractString(contents,constellation)
	if extractString(contents,marry)== ""{
		profile.Marry ="未婚"
	}else{
		profile.Marry ="已婚"
	}


	result:=ParseResult{
		Items:[]Item{
			{
				Url:url,
				Type:"zhenai",
				Id:extractString([]byte(url),idRe),
				Payload:profile,
			},

		},
	}

	return result
}

//封装 正则表达式匹配
func extractString(contents []byte,re *regexp.Regexp) string{
	match:=re.FindSubmatch(contents)

	if len(match)>=2{

		return string(match[1])
	}else{
		return ""
	}
}

/* end parser/profile.go */

/* start engine.go   单任务版引擎*/
func Run(seeds ...Request){
	var requests []Request

	for _,r := range seeds{
		requests = append(requests,r)
	}

	for len(requests) >0{
		r:=requests[0]

		requests = requests[1:]
		//fmt.Printf("Fetching %s",r.Url)
		body,err:= Fetch(r.Url)

		if err!=nil{
			log.Printf("Fetcher:error "+ "fetching url %s, : %v",r.Url,err)
			continue
		}

		parseResult:= r.ParserFunc(body)

		requests = append(requests,parseResult.Requests...)

		for _,item:= range parseResult.Items{
			fmt.Printf("Got item %s\n",item)
		}
	}
}


//具体的工作  传递一个request,通过解析器对url进行解析
func worker(r Request)(ParseResult,error){

	body,err:= Fetch(r.Url)

	if err!=nil{
		log.Printf("Fetcher:error "+ "fetching url %s, : %v",r.Url,err)
		return ParseResult{},err
	}
	return 	r.ParserFunc(body),nil
}


/* start itemchan.go   存储服务器*/
func ItemSaver() (chan Item,error){

	client,err := elastic.NewClient(
		elastic.SetSniff(false))
	if err !=nil{
		return nil ,err
	}

	out:=make(chan Item)

	go func(){
		itemCount:=0

		for{
			item:=<-out

			//fmt.Printf("Item Saver: error saveing item %d,%v",itemCount,item)
			log.Printf("Item Saversaveing item %d,%v",itemCount,item)
			itemCount++
			//存储到elastiicsearch
			err:= Save(client,item)

			if err!=nil{
				log.Print("Item Saver: error" + "saveing item %v,%v",item,err)
			}
		}
	}()

	return out,nil
}


func Save(client * elastic.Client,item Item)  error{



	indexServer:= client.Index().Index("dating_profile").Type(item.Type).BodyJson(item)

	if item.Id!=""{
		indexServer.Id(item.Id)
	}
	_,err := indexServer.Do(context.Background())

	if err !=nil{
		panic(err)
	}

	return nil

}
/* start itemchan.go   存储服务器*/


// 并发版爬虫引擎  包含了调度器 与 工人数
type ConcurrentEngine struct{
	Scheduler Scheduler
	WorkerCount int
	//增加存储通道,有item需要存储,即发送消息到此通道,触发存储操作。
	ItemChan chan Item
}

//调度器是一个接口,扩展性
type Scheduler interface {

	//提交Request进行执行
	Submit(Request)
	WorkerChan() chan Request

	WorkerReady(chan Request)
	Run()
}


//并发版爬虫引擎
func (e *ConcurrentEngine) Run(seeds ...Request){



	out:= make(chan ParseResult)
	//配置调度器通道
	e.Scheduler.Run()

	//开启WorkerCount个工作
	for i:=0;i<e.WorkerCount;i++{
		createWorker(e.Scheduler.WorkerChan(),out,e.Scheduler)
	}
		//种子首先运行
	for _,r:=range seeds{
		e.Scheduler.Submit(r)
	}

	for{
		//out等待接受ParseResult
		result:=<-out

		//打印出接收到的数据,以及个数。
		for _,item:= range result.Items{
			go func(){e.ItemChan <-item}()
		}

		//分配任务
		for _,request:= range result.Requests{

			if isDuplicate(request.Url){
				continue
			}


			e.Scheduler.Submit(request)
		}
	}
}

//存储URL、实行去掉重复URL的操作
var URLstore = make(map[string]bool)
func isDuplicate(url string) bool{
	if URLstore[url]{
		return true
	}
	URLstore[url] = true
	return false
}

//工作函数,逻辑是 in通道接收到request,即会调用worker函数爬每一个request中的网址,用对应的解析器。 解析完成后,将ParseResult返回给通道out
func createWorker(in chan Request,out chan ParseResult,s Scheduler) {

	go func(){
		for{
			//传递到调度器,提示可以开始工作
			s.WorkerReady(in)
			//有任务到工作中
			request := <-in
			//开始工作
			result,err:= worker(request)
			if err!=nil{
				continue
			}
			//工作结果返回
			out <-result
		}
	}()
}



/* end engine.go */

/* start scheduler.go 简单版调度器,用于分配工作任务 */
type SimpleScheduler struct{
	//通道
	workerChan chan Request
}

func (s *SimpleScheduler) WorkerChan() chan Request {
	return s.workerChan
}

func (s *SimpleScheduler) WorkerReady(chan Request) {

}

func (s *SimpleScheduler) Run() {
	s.workerChan = make(chan Request)
}

func ( s *SimpleScheduler) Submit( r Request) {
	//为了防止死锁,在调度器中建立 go 的协程  分配任务到通道中。
	go func(){s.workerChan <- r}()
}



/* end scheduler.go */

/* start Queuescheduler.go 队列调度器,用于分配工作任务 */

type QueuedScheduler struct {
	requestChan chan Request
	workerChan chan chan Request
}

func (s *QueuedScheduler) WorkerChan() chan Request {
	return make(chan Request)
}

//提交任务到通道,说明需要完成任务
func (s *QueuedScheduler) Submit(r Request) {
	s.requestChan <-r
}

//提交工作到通道,说明准备好工作了
func (s *QueuedScheduler) WorkerReady(w chan Request){
	s.workerChan <- w
}



func (s * QueuedScheduler) Run(){

	s.workerChan =make(chan chan Request)
	s.requestChan = make(chan Request)

	go func(){
		//任务队列
		var requestQ []Request
		//工作队列
		var workQ []chan Request
		for{

			var activeRequest Request
			var activework  chan Request
			//即有工作又有任务,开始工作
			if len(requestQ)>0 && len(workQ) >0{
				activework = workQ[0]
				activeRequest = requestQ[0]
			}

			select {
			//任务增加,添加到队列中
			case r:=<-s.requestChan:
				requestQ = append(requestQ,r)
				//工作增加,添加到队列中
			case w:= <-s.workerChan:
				workQ = append(workQ,w)
				//有工作又有任务,让工作去做任务
			case activework <- activeRequest:
				workQ = workQ[1:]
				requestQ=requestQ[1:]
			}

		}

	}()

}


//分布式存储服务器
func ItemSaver2(host string) (chan Item,error){

	client,err:=Newclient(host)
	out:=make(chan Item)

	if err !=nil{
		return nil,err
	}
	go func(){
		itemCount:=0

		for{
			item:=<-out

			//fmt.Printf("Item Saver: error saveing item %d,%v",itemCount,item)
			log.Printf("Item Saversaveing item %d,%v",itemCount,item)
			itemCount++

			result:=""
			//Call RPC to save item
			client.Call("ItemSaverService.Save",item,&result)

			if err!=nil{
				log.Print("Item Saver: error" + "saveing item %v,%v",item,err)
			}
		}
	}()

	return out,nil
}





//抽象出的jsonrpc server
func  ServerRpc(host string,service interface{}) error{


	rpc.Register(service)

	listener,err:=net.Listen("tcp",host)

	if err!=nil{
		return err
	}

	for{
		conn,err:=listener.Accept()
		if err!=nil{
			log.Printf("accept error:%v",err)
			continue
		}
		go jsonrpc.ServeConn(conn)
	}

	return nil

}

//抽象出的jsonrpc client
func Newclient(host string)(*rpc.Client,error){
	conn,err:=net.Dial("tcp",host)

	if err!=nil{
		return nil,err
	}

	client:= jsonrpc.NewClient(conn)
	return client,nil
}

type ItemSaverService struct{
	Client *elastic.Client
}

func (s*ItemSaverService) Save(item Item,result*string) error{
	err:= Save(s.Client,item)
	log.Printf("Item %v saved.",item)
	if err==nil{
		*result = "ok"
	}else{
		log.Printf("Error save Item %v %v.",item,err)
	}
	return err

}





/* end Queuescheduler.go 队列调度器,用于分配工作任务 */
func main(){

	//并发版爬虫+
	itemchan,err := ItemSaver2(":1234")

	if err!=nil{
		log.Panic(err)
	}
	e:= ConcurrentEngine{
		Scheduler:&QueuedScheduler{},
		WorkerCount:100,
		//增加存储通道
		ItemChan:itemchan,
	}

	e.Run(Request{
		Url:"http://www.zhenai.com/zhenghun",
		ParserFunc:ParseCityList,
	})



	//并发调度版爬虫
	//e:= ConcurrentEngine{
	//	Scheduler:&QueuedScheduler{},
	//	WorkerCount:100,
	//}
	//
	//e.Run(Request{
	//	Url:"http://www.zhenai.com/zhenghun",
	//	ParserFunc:ParseCityList,
	//})
	//


	////并发版爬虫
	//e:= ConcurrentEngine{
	//	Scheduler:&SimpleScheduler{},
	//	WorkerCount:100,
	//}
	//
	//e.Run(Request{
	//	Url:"http://www.zhenai.com/zhenghun",
	//	ParserFunc:ParseCityList,
	//})

	//单任务版爬虫
	//Run(Request{
	//	Url:"http://www.zhenai.com/zhenghun",
	//	ParserFunc:ParseCityList,
	//})
	//paseTest()
}

RPC服务器

func main() {
	client, err := elastic.NewClient(elastic.SetSniff(false))

	if err != nil {
		panic(err)
	}

	ServerRpc(":1234", &ItemSaverService{
		Client: client,
	})

}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Distributed Systems

Distributed Systems

Sukumar Ghosh / Chapman and Hall/CRC / 2014-7-14 / USD 119.95

Distributed Systems: An Algorithmic Approach, Second Edition provides a balanced and straightforward treatment of the underlying theory and practical applications of distributed computing. As in the p......一起来看看 《Distributed Systems》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具