内容简介:golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。workerpool的数据结构中WorkerFunc就是
golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的 fasthttp 就是一个很好的实现, Go开发HTTP的另一个选择fasthttp 中总结了它跟标准库实现的几点不同:
-
net/http的实现是一个连接新建一个 goroutine,fasthttp是利用一个 worker pool做了协程池,复用 goroutine,减轻 runtime 调度 goroutine 的压力 -
net/http解析的请求数据很多放在http.Header或者http.Request.Form中,数据结构map[string]string或map[string][]string涉及不必要的 []byte 到 string 的转换,是可以规避的 -
net/http解析 HTTP 请求每次生成新的*http.Request和http.ResponseWriter,fasthttp解析 HTTP 数据到*fasthttp.RequestCtx,然后使用sync.Pool复用结构实例,减少对象的数量 -
fasthttp会延迟解析 HTTP 请求中的数据,尤其是 Body 部分。这样节省了很多不直接操作 Body 的情况的消耗
workerpool
net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。
func (s *Server) Serve(ln net.Listener) error {
// default concurrency set to 256*1024
maxWorkersCount := s.getConcurrency()
s.concurrencyCh = make(chan struct{}, maxWorkersCount)
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
Logger: s.logger(),
}
wp.Start()
for {
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
wp.Stop()
if err == io.EOF {
return nil
}
return err
}
if !wp.Serve(c) {
s.writeFastError(c, StatusServiceUnavailable,
"The connection cannot be served for Server.Concurrency limit exceeded")
c.Close()
time.Sleep(100 * time.Millisecond)
}
c = nil
}
}
workerpool的数据结构中WorkerFunc就是 s.serveConn
,即每条net.conn的处理函数。workerChanPool是个对象池,MaxIdleWorkerDuration是worker的最大空闲时间,ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。
这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。
type workerPool struct {
WorkerFunc func(c net.Conn) error
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger
lock sync.Mutex
workersCount int
mustStop bool
ready []*workerChan
stopCh chan struct{}
workerChanPool sync.Pool
}
type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
}
start & stop
wp.start开启了一个goroutine,定时清理workerpool中未使用时间超过maxIdleWorkerDuration的goroutine。
func (wp *workerPool) Start() {
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
go func() {
var scratch []*workerChan
for {
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
stop停止了ready里所有ch清空,并清空ready。 资源清理时chan要置nil 。
func (wp *workerPool) Stop() {
close(wp.stopCh)
wp.stopCh = nil
wp.lock.Lock()
ready := wp.ready
for i, ch := range ready {
ch.ch <- nil
ready[i] = nil
}
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
}
serve
实现中还涉及到如果wp已经stop,那worker退出后channel对象通过临时对象池管理等细节,这里就跳过了。总结 wp.serve到s.serveconn的过程大概如下
-
当ready这个可用worker列表中没有ch可用时,创建一个新ch绑定
wp.Workfunc的goroutine。即新建了一个协程worker,这个协程从绑定的ch中获取待处理net.Conn。 - wp.Serve把accept的conn发到这个ch上,供绑定的协程worker处理。
-
worker处理完后
release这个绑定的ch到ready栈里。下一次有连接来时getCh优先从ready栈里找ch,也就是找worker。对ready的读取 FILO ,类似栈。
func (wp *workerPool) Serve(c net.Conn) bool {
ch := wp.getCh()
if ch == nil {
return false
}
ch.ch <- c
return true
}
getCh的实现可以理解为一个用来执行workFunc的goroutine都绑定了一个workerChan。把要处理的conn发到这个workerChan,这个goroutine就开始执行。没有要执行的conn则goroutine阻塞,直到下次workerChan有连接发来。
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
if vch == nil {
vch = &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
}
worker处理完一个连接后,将release这个连接到ready这个可用worker栈。即表示这时worker阻塞,可以交给它任务啦。 同时处理完net.Conn后要置nil。 正常情况下worker是不退出的,除非wp.Stop。
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
for c = range ch.ch {
if c == nil {
break
}
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
}
c = nil
if !wp.release(ch) {
break
}
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}
func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = CoarseTimeNow()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
return true
}
clean
最后看下start中开启的clean定时任务。之所以清理过程只从前遍历清理前面部分,是因为ready是FILO先进后出的,所以ready中越往后的空闲时间最短。
func (wp *workerPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
currentTime := time.Now()
wp.lock.Lock()
ready := wp.ready
n := len(ready)
i := 0
for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
i++
}
*scratch = append((*scratch)[:0], ready[:i]...)
if i > 0 {
m := copy(ready, ready[i:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
}
wp.lock.Unlock()
tmp := *scratch
for i, ch := range tmp {
ch.ch <- nil
tmp[i] = nil
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- php如何实现session,自己实现session,laravel如何实现session
- AOP如何实现及实现原理
- webpack 实现 HMR 及其实现原理
- Docker实现原理之 - OverlayFS实现原理
- 为什么实现 .NET 的 ICollection 集合时需要实现 SyncRoot 属性?如何正确实现这个属性?
- 自己实现集合框架(十):顺序栈的实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
机器与人:埃森哲论新人工智能
【美】保罗•多尔蒂 詹姆斯•威尔逊 / 赵亚男 / 中信出版社 / 2018-10-1 / 49.00元
自人工智能问世以来,人们普遍持有人机对立的观点,且无时无刻不在害怕自己的工作会被人工智能取代。作者认为,是时候抛开这些无谓的担忧了,因为人类社会正走向一个与机器共融共生的时代。 未来的新型工作模式是什么?未来有哪些工作不会被人工智能取代?人工智能时代重要的生存技能是什么?本书围绕这三大核心问题做了透彻的分析。作者带我们见识了置于业务流程背景之下的人工智能,阐述了其在不同职能部门中起到的推动作......一起来看看 《机器与人:埃森哲论新人工智能》 这本书的介绍吧!