记fastHTTP协程池的实现

栏目: Go · 发布时间: 5年前

内容简介:golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。workerpool的数据结构中WorkerFunc就是

golang的net/http包实现了处理连接时比较简单粗暴,相比之下有性能更好的http库,项目里用到过的 fasthttp 就是一个很好的实现, Go开发HTTP的另一个选择fasthttp 中总结了它跟标准库实现的几点不同:

  • net/http 的实现是一个连接新建一个 goroutine, fasthttp 是利用一个 worker pool做了协程池,复用 goroutine,减轻 runtime 调度 goroutine 的压力
  • net/http 解析的请求数据很多放在http.Header或者http.Request.Form中,数据结构 map[string]stringmap[string][]string 涉及不必要的 []byte 到 string 的转换,是可以规避的
  • net/http 解析 HTTP 请求每次生成新的 *http.Requesthttp.ResponseWriterfasthttp 解析 HTTP 数据到 *fasthttp.RequestCtx ,然后使用 sync.Pool 复用结构实例,减少对象的数量
  • fasthttp 会延迟解析 HTTP 请求中的数据,尤其是 Body 部分。这样节省了很多不直接操作 Body 的情况的消耗

workerpool

net包和fasthttp最大的不同可能就是server在处理连接的时候使用了协程池。在并发量大的时候,goroutine数量巨大,runtime层的上下文切换成本对性能有影响。而fasthttp用协程池规避了这个问题,去年在做AOS的时候,项目中后期也引入了workerpool。

func (s *Server) Serve(ln net.Listener) error {
    // default concurrency set to 256*1024
    maxWorkersCount := s.getConcurrency()
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
    }
    wp.Start()
    for {
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            wp.Stop()
            if err == io.EOF {
                return nil
            }
            return err
        }
        if !wp.Serve(c) {
            s.writeFastError(c, StatusServiceUnavailable,
            "The connection cannot be served for Server.Concurrency limit exceeded")
            c.Close()
            time.Sleep(100 * time.Millisecond)
        }
        c = nil
    }
}

workerpool的数据结构中WorkerFunc就是 s.serveConn ,即每条net.conn的处理函数。workerChanPool是个对象池,MaxIdleWorkerDuration是worker的最大空闲时间,ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。

这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。

type workerPool struct {
    WorkerFunc func(c net.Conn) error
    MaxWorkersCount int
    LogAllErrors bool
    MaxIdleWorkerDuration time.Duration
    Logger Logger
  
    lock         sync.Mutex
    workersCount int
    mustStop     bool
    ready []*workerChan
    stopCh chan struct{}
    workerChanPool sync.Pool
}

type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}

start & stop

wp.start开启了一个goroutine,定时清理workerpool中未使用时间超过maxIdleWorkerDuration的goroutine。

func (wp *workerPool) Start() {
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

stop停止了ready里所有ch清空,并清空ready。 资源清理时chan要置nil

func (wp *workerPool) Stop() {
    close(wp.stopCh)
    wp.stopCh = nil
    wp.lock.Lock()
    ready := wp.ready
    for i, ch := range ready {
        ch.ch <- nil
        ready[i] = nil
    }
    wp.ready = ready[:0]
    wp.mustStop = true
    wp.lock.Unlock()
}

serve

实现中还涉及到如果wp已经stop,那worker退出后channel对象通过临时对象池管理等细节,这里就跳过了。总结 wp.serve到s.serveconn的过程大概如下

  1. 当ready这个可用worker列表中没有ch可用时,创建一个新ch绑定 wp.Workfunc 的goroutine。即新建了一个协程worker,这个协程从绑定的ch中获取待处理net.Conn。
  2. wp.Serve把accept的conn发到这个ch上,供绑定的协程worker处理。
  3. worker处理完后 release 这个绑定的ch到ready栈里。下一次有连接来时getCh优先从ready栈里找ch,也就是找worker。对ready的读取 FILO ,类似栈。
func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    ch.ch <- c
    return true
}

getCh的实现可以理解为一个用来执行workFunc的goroutine都绑定了一个workerChan。把要处理的conn发到这个workerChan,这个goroutine就开始执行。没有要执行的conn则goroutine阻塞,直到下次workerChan有连接发来。

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() {
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

worker处理完一个连接后,将release这个连接到ready这个可用worker栈。即表示这时worker阻塞,可以交给它任务啦。 同时处理完net.Conn后要置nil。 正常情况下worker是不退出的,除非wp.Stop。

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn
    var err error
    for c = range ch.ch {
        if c == nil {
            break
        }
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
        }
        c = nil
        if !wp.release(ch) {
            break
        }
    }
    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

func (wp *workerPool) release(ch *workerChan) bool {
    ch.lastUseTime = CoarseTimeNow()
    wp.lock.Lock()
    if wp.mustStop {
        wp.lock.Unlock()
        return false
    }
    wp.ready = append(wp.ready, ch)
    wp.lock.Unlock()
    return true
}

clean

最后看下start中开启的clean定时任务。之所以清理过程只从前遍历清理前面部分,是因为ready是FILO先进后出的,所以ready中越往后的空闲时间最短。

func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
    currentTime := time.Now()
    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++
    }
    *scratch = append((*scratch)[:0], ready[:i]...)
    if i > 0 {
        m := copy(ready, ready[i:])
        for i = m; i < n; i++ {
            ready[i] = nil
        }
        wp.ready = ready[:m]
    }
    wp.lock.Unlock()
    tmp := *scratch
    for i, ch := range tmp {
        ch.ch <- nil
        tmp[i] = nil
    }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

程序员的修炼

程序员的修炼

Jeff Atwood / 陆其明、杨溢 / 人民邮电出版社 / 2014-4 / 45.00元

《程序员的修炼——从优秀到卓越》是《高效能程序员的修炼》的姊妹篇,包含了Coding Horror博客中的精华文章。全书分为8章,涵盖了时间管理、编程方法、Web设计、测试、用户需求、互联网、游戏编程以及技术阅读等方面的话题。作者选取的话题,无一不是程序员职业生涯中的痛点。很多文章在博客和网络上的点击率和回帖率居高不下。 Jeff Atwood于2004年创办Coding Horror博客(......一起来看看 《程序员的修炼》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

html转js在线工具
html转js在线工具

html转js在线工具