golang[59]-分布式爬虫-jsonrpc抽离存储数据引擎

栏目: 服务器 · 发布时间: 5年前

package main

import (
	"io/ioutil"
	"golang.org/x/net/html/charset"
	"golang.org/x/text/encoding"
	"bufio"
	"golang.org/x/text/transform"
	"golang.org/x/text/encoding/unicode"
	"log"
	"regexp"
	"strconv"
	"net/http"
	"fmt"
	"time"
	"gopkg.in/olivere/elastic.v5"
	 "context"
	"net/rpc/jsonrpc"
	"net/rpc"
	"net"
)


var rateLimiter = time.Tick(10 * time.Millisecond)
/* start Fetch.go*/
func Fetch(url string)([]byte ,error){
	//resp,err:= http.Get(url)
	//
	//if err!=nil{
	//	return nil,err
	//}
	//
	//defer resp.Body.Close()
	//if resp.StatusCode != http.StatusOK{
	//	return nil,fmt.Errorf("Error: status code:%d",resp.StatusCode)
	//}

	client := &http.Client{}
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		fmt.Printf("get url:%s error",url )
		return nil,fmt.Errorf("get url:%d error",url)


	}
	req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36")

	resp, err := client.Do(req)
	if err != nil {

		//log.Fatalln(err)
		fmt.Printf("get url:%s error",url )
		return nil,fmt.Errorf("get url:%d error",url)
	}

	defer resp.Body.Close()

	bodyReader:= bufio.NewReader(resp.Body)
	e:= determineEncoding(bodyReader)
	utf8reader:= transform.NewReader(bodyReader,e.NewDecoder())

	return ioutil.ReadAll(utf8reader)
}


func determineEncoding(r *bufio.Reader) encoding.Encoding{

	bytes,err := bufio.NewReader(r).Peek(1024)
	if err !=nil{
		log.Printf("Fetcher error:%v",err)
		return unicode.UTF8
	}
	e,_,_:= charset.DetermineEncoding(bytes,"")
	return e
}
/* end  Fetch.go*/
/* start Type.go*/
type Request struct{
	Url string
	ParserFunc func([]byte) ParseResult
}

type ParseResult struct{
	Requests []Request
	Items []Item
}

type Item struct{
	Url string
	Id string
	Type string
	Payload interface{}
}


func NilParser([]byte) ParseResult{
	return ParseResult{}
}

/* end Type.go*/


/* start parser/city.go  爬取城市下每一个用户和网址*/


var cityRe = regexp.MustCompile(`<a href="(http://album.zhenai.com/u/[\d]+)" target="_blank">([^<]+)</a>`)
var cityUrlRe = regexp.MustCompile(`<a href="(http://www.zhenai.com/zhenghun/[^"]+)"`)


func ParseCity(contents []byte) ParseResult{

	matches:= cityRe.FindAllSubmatch(contents,-1)

	result := ParseResult{}
	for _,m:= range matches{

		url:=string(m[1])
		name:=string(m[2])
		//println(string(m[1]))
		//不用用户名了
		//result.Items = append(result.Items,"User:"+string(m[2]))
		result.Requests = append(result.Requests,Request{
			Url:string(m[1]),
			ParserFunc:func(c []byte) ParseResult{
				return PaesrProfile(
					c,url,name)
			},
		})
		}

	//查找城市页面下的城市链接
	matches=  cityUrlRe.FindAllSubmatch(contents,-1)

	for _,m:= range matches{
		result.Requests = append(result.Requests,Request{
			Url:string(m[1]),
			ParserFunc:ParseCity,
			})
		}


	return result

}



/* end parser/city.go */

/* start parser/citylist.go */

const cityListRe = `(http://www.zhenai.com/zhenghun/[0-9a-z]+)"[^>]*>([^<]+)</a>`
func ParseCityList(contents []byte) ParseResult{
	re:=regexp.MustCompile(cityListRe)

	matches:= re.FindAllSubmatch(contents,-1)
	result:=ParseResult{}

	for _,m :=range matches{
			//result.Items = append(result.Items,string(m[2]))
			result.Requests  = append(
				result.Requests,Request{
					Url:string(m[1]),
					ParserFunc:ParseCity,
				})


	}






	return result
}

/* end parser/citylist.go */

/* start profile.go */
type Profile struct {
	Name string
	Age int
	Marry string
	Constellation string
	Height int
	Weight int
	Salary string
}


func (p Profile) String() string{
	return  p.Name +" " + p.Marry + strconv.Itoa(p.Age) +"olds "+   strconv.Itoa(p.Height) + "cm " +  strconv.Itoa(p.Weight)+ "kg "
}
/* end profile.go */

/* start parser/profile.go */


var ageRe = regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)岁</div>`)
var marry =   regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>(已婚)</div>`)
var constellation =   regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>(天秤座)</div>`)
var height  =regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)cm</div>`)
var weight =regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>([\d]+)kg</div>`)
var salary = 	regexp.MustCompile(`<div class="m-btn purple" data-v-bff6f798>月收入:([^<]+)</div>`)

var idRe =  regexp.MustCompile(`http://album.zhenai.com/u/([\d]+)`)

//解析器 解析用户
//name为上一级传递过来的
func PaesrProfile(contents []byte,url string,name string) ParseResult{

	//ioutil.WriteFile("test.html",contents,0x777)
	//用户结构体
	profile:=Profile{}
	profile.Name = name

	//年龄   string转换为int
		age,err:= strconv.Atoi(extractString(contents,ageRe))
		if err==nil{
			profile.Age = age
		}
		//身高
	height,err:= strconv.Atoi(extractString(contents,height))
	if err==nil{
		profile.Height = height
	}
	//体重
	weight,err:= strconv.Atoi(extractString(contents,weight))
	if err==nil{
		profile.Weight = weight
	}

		//薪水
	profile.Salary = extractString(contents,salary)

	//星座
	profile.Constellation = extractString(contents,constellation)
	if extractString(contents,marry)== ""{
		profile.Marry ="未婚"
	}else{
		profile.Marry ="已婚"
	}


	result:=ParseResult{
		Items:[]Item{
			{
				Url:url,
				Type:"zhenai",
				Id:extractString([]byte(url),idRe),
				Payload:profile,
			},

		},
	}

	return result
}

//封装 正则表达式匹配
func extractString(contents []byte,re *regexp.Regexp) string{
	match:=re.FindSubmatch(contents)

	if len(match)>=2{

		return string(match[1])
	}else{
		return ""
	}
}

/* end parser/profile.go */

/* start engine.go   单任务版引擎*/
func Run(seeds ...Request){
	var requests []Request

	for _,r := range seeds{
		requests = append(requests,r)
	}

	for len(requests) >0{
		r:=requests[0]

		requests = requests[1:]
		//fmt.Printf("Fetching %s",r.Url)
		body,err:= Fetch(r.Url)

		if err!=nil{
			log.Printf("Fetcher:error "+ "fetching url %s, : %v",r.Url,err)
			continue
		}

		parseResult:= r.ParserFunc(body)

		requests = append(requests,parseResult.Requests...)

		for _,item:= range parseResult.Items{
			fmt.Printf("Got item %s\n",item)
		}
	}
}


//具体的工作  传递一个request,通过解析器对url进行解析
func worker(r Request)(ParseResult,error){

	body,err:= Fetch(r.Url)

	if err!=nil{
		log.Printf("Fetcher:error "+ "fetching url %s, : %v",r.Url,err)
		return ParseResult{},err
	}
	return 	r.ParserFunc(body),nil
}


/* start itemchan.go   存储服务器*/
func ItemSaver() (chan Item,error){

	client,err := elastic.NewClient(
		elastic.SetSniff(false))
	if err !=nil{
		return nil ,err
	}

	out:=make(chan Item)

	go func(){
		itemCount:=0

		for{
			item:=<-out

			//fmt.Printf("Item Saver: error saveing item %d,%v",itemCount,item)
			log.Printf("Item Saversaveing item %d,%v",itemCount,item)
			itemCount++
			//存储到elastiicsearch
			err:= Save(client,item)

			if err!=nil{
				log.Print("Item Saver: error" + "saveing item %v,%v",item,err)
			}
		}
	}()

	return out,nil
}


func Save(client * elastic.Client,item Item)  error{



	indexServer:= client.Index().Index("dating_profile").Type(item.Type).BodyJson(item)

	if item.Id!=""{
		indexServer.Id(item.Id)
	}
	_,err := indexServer.Do(context.Background())

	if err !=nil{
		panic(err)
	}

	return nil

}
/* start itemchan.go   存储服务器*/


// 并发版爬虫引擎  包含了调度器 与 工人数
type ConcurrentEngine struct{
	Scheduler Scheduler
	WorkerCount int
	//增加存储通道,有item需要存储,即发送消息到此通道,触发存储操作。
	ItemChan chan Item
}

//调度器是一个接口,扩展性
type Scheduler interface {

	//提交Request进行执行
	Submit(Request)
	WorkerChan() chan Request

	WorkerReady(chan Request)
	Run()
}


//并发版爬虫引擎
func (e *ConcurrentEngine) Run(seeds ...Request){



	out:= make(chan ParseResult)
	//配置调度器通道
	e.Scheduler.Run()

	//开启WorkerCount个工作
	for i:=0;i<e.WorkerCount;i++{
		createWorker(e.Scheduler.WorkerChan(),out,e.Scheduler)
	}
		//种子首先运行
	for _,r:=range seeds{
		e.Scheduler.Submit(r)
	}

	for{
		//out等待接受ParseResult
		result:=<-out

		//打印出接收到的数据,以及个数。
		for _,item:= range result.Items{
			go func(){e.ItemChan <-item}()
		}

		//分配任务
		for _,request:= range result.Requests{

			if isDuplicate(request.Url){
				continue
			}


			e.Scheduler.Submit(request)
		}
	}
}

//存储URL、实行去掉重复URL的操作
var URLstore = make(map[string]bool)
func isDuplicate(url string) bool{
	if URLstore[url]{
		return true
	}
	URLstore[url] = true
	return false
}

//工作函数,逻辑是 in通道接收到request,即会调用worker函数爬每一个request中的网址,用对应的解析器。 解析完成后,将ParseResult返回给通道out
func createWorker(in chan Request,out chan ParseResult,s Scheduler) {

	go func(){
		for{
			//传递到调度器,提示可以开始工作
			s.WorkerReady(in)
			//有任务到工作中
			request := <-in
			//开始工作
			result,err:= worker(request)
			if err!=nil{
				continue
			}
			//工作结果返回
			out <-result
		}
	}()
}



/* end engine.go */

/* start scheduler.go 简单版调度器,用于分配工作任务 */
type SimpleScheduler struct{
	//通道
	workerChan chan Request
}

func (s *SimpleScheduler) WorkerChan() chan Request {
	return s.workerChan
}

func (s *SimpleScheduler) WorkerReady(chan Request) {

}

func (s *SimpleScheduler) Run() {
	s.workerChan = make(chan Request)
}

func ( s *SimpleScheduler) Submit( r Request) {
	//为了防止死锁,在调度器中建立 go 的协程  分配任务到通道中。
	go func(){s.workerChan <- r}()
}



/* end scheduler.go */

/* start Queuescheduler.go 队列调度器,用于分配工作任务 */

type QueuedScheduler struct {
	requestChan chan Request
	workerChan chan chan Request
}

func (s *QueuedScheduler) WorkerChan() chan Request {
	return make(chan Request)
}

//提交任务到通道,说明需要完成任务
func (s *QueuedScheduler) Submit(r Request) {
	s.requestChan <-r
}

//提交工作到通道,说明准备好工作了
func (s *QueuedScheduler) WorkerReady(w chan Request){
	s.workerChan <- w
}



func (s * QueuedScheduler) Run(){

	s.workerChan =make(chan chan Request)
	s.requestChan = make(chan Request)

	go func(){
		//任务队列
		var requestQ []Request
		//工作队列
		var workQ []chan Request
		for{

			var activeRequest Request
			var activework  chan Request
			//即有工作又有任务,开始工作
			if len(requestQ)>0 && len(workQ) >0{
				activework = workQ[0]
				activeRequest = requestQ[0]
			}

			select {
			//任务增加,添加到队列中
			case r:=<-s.requestChan:
				requestQ = append(requestQ,r)
				//工作增加,添加到队列中
			case w:= <-s.workerChan:
				workQ = append(workQ,w)
				//有工作又有任务,让工作去做任务
			case activework <- activeRequest:
				workQ = workQ[1:]
				requestQ=requestQ[1:]
			}

		}

	}()

}


//分布式存储服务器
func ItemSaver2(host string) (chan Item,error){

	client,err:=Newclient(host)
	out:=make(chan Item)

	if err !=nil{
		return nil,err
	}
	go func(){
		itemCount:=0

		for{
			item:=<-out

			//fmt.Printf("Item Saver: error saveing item %d,%v",itemCount,item)
			log.Printf("Item Saversaveing item %d,%v",itemCount,item)
			itemCount++

			result:=""
			//Call RPC to save item
			client.Call("ItemSaverService.Save",item,&result)

			if err!=nil{
				log.Print("Item Saver: error" + "saveing item %v,%v",item,err)
			}
		}
	}()

	return out,nil
}





//抽象出的jsonrpc server
func  ServerRpc(host string,service interface{}) error{


	rpc.Register(service)

	listener,err:=net.Listen("tcp",host)

	if err!=nil{
		return err
	}

	for{
		conn,err:=listener.Accept()
		if err!=nil{
			log.Printf("accept error:%v",err)
			continue
		}
		go jsonrpc.ServeConn(conn)
	}

	return nil

}

//抽象出的jsonrpc client
func Newclient(host string)(*rpc.Client,error){
	conn,err:=net.Dial("tcp",host)

	if err!=nil{
		return nil,err
	}

	client:= jsonrpc.NewClient(conn)
	return client,nil
}

type ItemSaverService struct{
	Client *elastic.Client
}

func (s*ItemSaverService) Save(item Item,result*string) error{
	err:= Save(s.Client,item)
	log.Printf("Item %v saved.",item)
	if err==nil{
		*result = "ok"
	}else{
		log.Printf("Error save Item %v %v.",item,err)
	}
	return err

}





/* end Queuescheduler.go 队列调度器,用于分配工作任务 */
func main(){

	//并发版爬虫+
	itemchan,err := ItemSaver2(":1234")

	if err!=nil{
		log.Panic(err)
	}
	e:= ConcurrentEngine{
		Scheduler:&QueuedScheduler{},
		WorkerCount:100,
		//增加存储通道
		ItemChan:itemchan,
	}

	e.Run(Request{
		Url:"http://www.zhenai.com/zhenghun",
		ParserFunc:ParseCityList,
	})



	//并发调度版爬虫
	//e:= ConcurrentEngine{
	//	Scheduler:&QueuedScheduler{},
	//	WorkerCount:100,
	//}
	//
	//e.Run(Request{
	//	Url:"http://www.zhenai.com/zhenghun",
	//	ParserFunc:ParseCityList,
	//})
	//


	////并发版爬虫
	//e:= ConcurrentEngine{
	//	Scheduler:&SimpleScheduler{},
	//	WorkerCount:100,
	//}
	//
	//e.Run(Request{
	//	Url:"http://www.zhenai.com/zhenghun",
	//	ParserFunc:ParseCityList,
	//})

	//单任务版爬虫
	//Run(Request{
	//	Url:"http://www.zhenai.com/zhenghun",
	//	ParserFunc:ParseCityList,
	//})
	//paseTest()
}

RPC服务器

func main() {
	client, err := elastic.NewClient(elastic.SetSniff(false))

	if err != nil {
		panic(err)
	}

	ServerRpc(":1234", &ItemSaverService{
		Client: client,
	})

}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

后谷歌时代:大数据的衰落及区块链经济的崛起

后谷歌时代:大数据的衰落及区块链经济的崛起

乔治·吉尔德 / 现代出版社 / 2018-9-5 / 68

以大数据和机器智能为基础的谷歌时代(信息互联网时代)是一个令人敬畏的时代。但它即将终结。 《后谷歌时代》一书的作者乔治•吉尔德是一位颇具远见卓识的智者。他在技术和文化领域具有无与伦比的视野和见地。他向读者描述了谷歌所面临信任与安全危机,并勇敢地预测了即将到来的后谷歌时代。 谷歌用其惊人的“搜索和排序”能力吸引了整个世界。功能强大的搜索引擎,看似免费小应用,诸如视频、地图、电子邮箱等,让......一起来看看 《后谷歌时代:大数据的衰落及区块链经济的崛起》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具