内容简介:golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。workerpool的数据结构中WorkerFunc就是
golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的 fasthttp 就是一个很好的实现, Go开发HTTP的另一个选择fasthttp 中总结了它跟标准库实现的几点不同:
-
net/http
的实现是一个连接新建一个 goroutine,fasthttp
是利用一个 worker pool做了协程池,复用 goroutine,减轻 runtime 调度 goroutine 的压力 -
net/http
解析的请求数据很多放在http.Header或者http.Request.Form中,数据结构map[string]string
或map[string][]string
涉及不必要的 []byte 到 string 的转换,是可以规避的 -
net/http
解析 HTTP 请求每次生成新的*http.Request
和http.ResponseWriter
,fasthttp
解析 HTTP 数据到*fasthttp.RequestCtx
,然后使用sync.Pool
复用结构实例,减少对象的数量 -
fasthttp
会延迟解析 HTTP 请求中的数据,尤其是 Body 部分。这样节省了很多不直接操作 Body 的情况的消耗
workerpool
net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。
func (s *Server) Serve(ln net.Listener) error { // default concurrency set to 256*1024 maxWorkersCount := s.getConcurrency() s.concurrencyCh = make(chan struct{}, maxWorkersCount) wp := &workerPool{ WorkerFunc: s.serveConn, MaxWorkersCount: maxWorkersCount, LogAllErrors: s.LogAllErrors, Logger: s.logger(), } wp.Start() for { if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { wp.Stop() if err == io.EOF { return nil } return err } if !wp.Serve(c) { s.writeFastError(c, StatusServiceUnavailable, "The connection cannot be served for Server.Concurrency limit exceeded") c.Close() time.Sleep(100 * time.Millisecond) } c = nil } }
workerpool的数据结构中WorkerFunc就是 s.serveConn
,即每条net.conn的处理函数。workerChanPool是个对象池,MaxIdleWorkerDuration是worker的最大空闲时间,ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。
这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。
type workerPool struct { WorkerFunc func(c net.Conn) error MaxWorkersCount int LogAllErrors bool MaxIdleWorkerDuration time.Duration Logger Logger lock sync.Mutex workersCount int mustStop bool ready []*workerChan stopCh chan struct{} workerChanPool sync.Pool } type workerChan struct { lastUseTime time.Time ch chan net.Conn }
start & stop
wp.start开启了一个goroutine,定时清理workerpool中未使用时间超过maxIdleWorkerDuration的goroutine。
func (wp *workerPool) Start() { wp.stopCh = make(chan struct{}) stopCh := wp.stopCh go func() { var scratch []*workerChan for { wp.clean(&scratch) select { case <-stopCh: return default: time.Sleep(wp.getMaxIdleWorkerDuration()) } } }() }
stop停止了ready里所有ch清空,并清空ready。 资源清理时chan要置nil 。
func (wp *workerPool) Stop() { close(wp.stopCh) wp.stopCh = nil wp.lock.Lock() ready := wp.ready for i, ch := range ready { ch.ch <- nil ready[i] = nil } wp.ready = ready[:0] wp.mustStop = true wp.lock.Unlock() }
serve
实现中还涉及到如果wp已经stop,那worker退出后channel对象通过临时对象池管理等细节,这里就跳过了。总结 wp.serve到s.serveconn的过程大概如下
-
当ready这个可用worker列表中没有ch可用时,创建一个新ch绑定
wp.Workfunc
的goroutine。即新建了一个协程worker,这个协程从绑定的ch中获取待处理net.Conn。 - wp.Serve把accept的conn发到这个ch上,供绑定的协程worker处理。
-
worker处理完后
release
这个绑定的ch到ready栈里。下一次有连接来时getCh优先从ready栈里找ch,也就是找worker。对ready的读取 FILO ,类似栈。
func (wp *workerPool) Serve(c net.Conn) bool { ch := wp.getCh() if ch == nil { return false } ch.ch <- c return true }
getCh的实现可以理解为一个用来执行workFunc的goroutine都绑定了一个workerChan。把要处理的conn发到这个workerChan,这个goroutine就开始执行。没有要执行的conn则goroutine阻塞,直到下次workerChan有连接发来。
func (wp *workerPool) getCh() *workerChan { var ch *workerChan createWorker := false wp.lock.Lock() ready := wp.ready n := len(ready) - 1 if n < 0 { if wp.workersCount < wp.MaxWorkersCount { createWorker = true wp.workersCount++ } } else { ch = ready[n] ready[n] = nil wp.ready = ready[:n] } wp.lock.Unlock() if ch == nil { if !createWorker { return nil } vch := wp.workerChanPool.Get() if vch == nil { vch = &workerChan{ ch: make(chan net.Conn, workerChanCap), } } ch = vch.(*workerChan) go func() { wp.workerFunc(ch) wp.workerChanPool.Put(vch) }() } return ch }
worker处理完一个连接后,将release这个连接到ready这个可用worker栈。即表示这时worker阻塞,可以交给它任务啦。 同时处理完net.Conn后要置nil。 正常情况下worker是不退出的,除非wp.Stop。
func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error for c = range ch.ch { if c == nil { break } if err = wp.WorkerFunc(c); err != nil && err != errHijacked { } c = nil if !wp.release(ch) { break } } wp.lock.Lock() wp.workersCount-- wp.lock.Unlock() } func (wp *workerPool) release(ch *workerChan) bool { ch.lastUseTime = CoarseTimeNow() wp.lock.Lock() if wp.mustStop { wp.lock.Unlock() return false } wp.ready = append(wp.ready, ch) wp.lock.Unlock() return true }
clean
最后看下start中开启的clean定时任务。之所以清理过程只从前遍历清理前面部分,是因为ready是FILO先进后出的,所以ready中越往后的空闲时间最短。
func (wp *workerPool) clean(scratch *[]*workerChan) { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() currentTime := time.Now() wp.lock.Lock() ready := wp.ready n := len(ready) i := 0 for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration { i++ } *scratch = append((*scratch)[:0], ready[:i]...) if i > 0 { m := copy(ready, ready[i:]) for i = m; i < n; i++ { ready[i] = nil } wp.ready = ready[:m] } wp.lock.Unlock() tmp := *scratch for i, ch := range tmp { ch.ch <- nil tmp[i] = nil } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- php如何实现session,自己实现session,laravel如何实现session
- AOP如何实现及实现原理
- webpack 实现 HMR 及其实现原理
- Docker实现原理之 - OverlayFS实现原理
- 为什么实现 .NET 的 ICollection 集合时需要实现 SyncRoot 属性?如何正确实现这个属性?
- 自己实现集合框架(十):顺序栈的实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。