Go channel实现原理

栏目: Go · 发布时间: 6年前

内容简介:go通道基于go的并发调度实现,本身并不复杂,go并发调度请看我的这篇文章:go并发调度原理学习1.channel数据结构2.创建channel实现

go通道基于 go 的并发调度实现,本身并不复杂,go并发调度请看我的这篇文章:go并发调度原理学习

1.channel数据结构

type hchan struct {
   qcount   uint               // 缓冲区中已有元素个数
   dataqsiz uint               //循环队列容量大小
   buf      unsafe.Pointer // 缓冲区指针
   elemsize uint16          //元素大小
   closed   uint32           //关闭标记,0没关闭,1关闭
   elemtype *_type         //数据项类型
   sendx    uint               //发送索引
   recvx    uint                //接收索引
   recvq    waitq             //等待接收排队链表
   sendq    waitq            //等待发送排队链表
   lock mutex                 //锁
}
type waitq struct {
   first *sudog
   last  *sudog
}

2.创建channel实现

创建channel实例:

ch := make(chan int, 4)

实现函数:

func makechan(t *chantype, size int64) *hchan

大致实现:

执行上面这行代码会new一个hchan结构,同时创建一个dataqsiz=4的int类型的循环队列,其实就是一个容纳4个元素的数组,就是按顺序往里面写数据,写满之后又从0开始写,这个顺序索引就是hchan.sendx

3.发送数据

发送数据实例:

ch <- 100

发送数据实现函数:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

ep指向要发送数据的首地址

 
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }
 
   if sg := c.recvq.dequeue(); sg != nil {
      //缓冲区就是一个固定长度的循环列表
      //发送队列是一个双向链表,接在缓冲区的后面,整体是一个队列,保证先进先出
      //有接收者,并不是将当前要发送的数据直接发出,而是将缓冲区的第一个元素发送给接收者,同时将发送队列的第一个元素加入缓冲区刚出队列的位置
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
 
   if c.qcount < c.dataqsiz {
      //缓冲区没有满,直接将要发送的数据复制到缓冲区,直接返回,
      qp := chanbuf(c, c.sendx)
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
 
   if !block {
      unlock(&c.lock)
      return false
   }
   //以上都是同步非阻塞的,ch <- 100直接返回
   
   //以下是同步阻塞
   //缓冲区满了,也没有接收者,通道将被阻塞,其实就是不执行当前G了,将状态改成等待状态
   gp := getg()
   mysg := acquireSudog()
   c.sendq.enqueue(mysg)
   goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
 
   //当G被唤醒,状态改成可执行状态,从这里开始继续执行
   releaseSudog(mysg)
   return true
}

大致实现:

1:接收队列不为空,从接收队列中取出第一个接收者*sudog,将数据复制到sudog.elem,复制函数为memmove用汇编实现,通知接收方数据给你了,将接收方协程由等待状态改成可运行状态,将当前协程加入协程队列,等待被调度。

2:没有接收者,有缓冲区且没有满,直接将数据复制到缓冲中,写入缓冲区的位置为hchan.buf[sendx++],如果缓冲区已满sendx=0,就是循环队列的实现,往sendx指定的位置写数据,hchan.qcount++

3:没有接收者,没有缓冲区或是满了,则从当前协程对应的P的sudog队列中取一个struct sudog,将数据复制到sudog.elem,将sudog加入sendq队列中,通知接收方,当前流程阻塞,等待被唤醒,接收方收到通知后(被唤醒),继续往下执行,接收数据完成后会通知发送方,即将发送方协程状态由等待状态改成可运行状态,加入协程可运行队列,等着被执行

不会阻塞的情况:

1:通道缓冲区没有满之前,因为只是将要发送的数据复制到缓冲区就返回了

2:有接收者的情况,有数据复制到接收方的数据结构中(不是最终接收数据的变量,在执行接收函数的时候会拷贝到最终接收数据的变量),唤醒接收协程

会阻塞的情况:

自然就是缓冲区满了,也没有接收方,这个时候会将数据打包放到发送队列,当前协程被设置成等待状态,这个状态不会被调度,当有接收方收到数据后,才会被唤醒

通知函数:

goready(gp *g, traceskip int)

4.接收数据

接收数据实例:

val := <- ch

接收数据实现函数:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   lock(&c.lock)
   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }
 
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }
 
   if !block {
      unlock(&c.lock)
      return false, false
   }
   //以上同步非阻塞
 
   //以下同步阻塞
   gp := getg()
   mysg := acquireSudog()
   c.recvq.enqueue(mysg)
   //将当前G状态改成等待状态,停止调度
   goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
 
   //当前G被唤醒从这里继续执行 
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}

大致实现:

1.发送队列不为空(说明缓冲区已满),从发送队列中取出第一个发送者*sudog

1.1.没有缓冲区,直接将发送队列中的数据sudog.elem复制出来,存到接收数据的变量val中,通知发送方我处理完了,你可以继续执行

1.2.有缓冲区,复制出缓冲区hchan.buf[recvx]对应的元素到val,在将发送方sudog.elem复制到hchan.buf[recvx],发送方按顺序写,接收方按顺序读,典型的FIFO,为了保证是先进先出,所以先复制出,再将队列首元素复制到对应的缓冲区中,其实就是发送队列连接在缓冲区后面,缓冲区满了,就写队列,接收的时候先从缓冲区中拿数据,拿掉之后空出来的位置从发送队列中取第一个填满,并唤醒对应的G,只要发送队列不为空,缓冲区肯定会被填满

2.发送队列为空,缓冲区不为空,复制出缓冲区hchan.buf[recvx]对应的元素到val,hchan.qcount--

3.发送队列为空,缓冲区也为空,那就是没有任何待接收的数据,接收流程就只能等了,将接收信息打包成sudog,加入接收队列recvq,当前执行流程阻塞,等有发送数据后会被唤醒继续

5.channel FIFO在解释一次

5.1:缓冲区没满,发送数据就是进缓冲队列,接收数据就是出缓冲队列,比较好理解

5.2:缓冲区已满,发送数据就是进等待队列,接收数据先出缓冲队列,即为要接收的数据,等待队列出列,将数据存在缓冲队列刚出列的位置,刚出列的位置相当于缓冲队列的末尾,也就是说等待队列的列头连在缓冲队列的末尾,将等待队列的列头加入缓存队列的列尾,保证了缓冲队列是满的,减少的是缓冲队列中的数据,保证先进先出

5.3:接收数据,缓冲队列或等待队列有数据,拿走第一个,保证等待队列是接在缓冲区末尾,即缓冲区末尾有空缺,就让等待队列出列,并填充至缓冲区末尾,否则将自己打包加入接收队列,当前G进入等待状态,有数据发送自然会通知你

总结:Go channel基于go的并发调度实现阻塞和非阻塞两种通讯方式


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro Git (Second Edition)

Pro Git (Second Edition)

Scott Chacon、Ben Straub / Apress / 2014-11-9 / USD 59.99

Scott Chacon is a cofounder and the CIO of GitHub and is also the maintainer of the Git homepage ( git-scm.com ) . Scott has presented at dozens of conferences around the world on Git, GitHub and the ......一起来看看 《Pro Git (Second Edition)》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具