内容简介:channel是Golang中一个非常重要的特性,也是Golang CSP并发模型的一个重要体现。简单来说就是,goroutine之间可以通过channel进行通信。channel在Golang如此重要,在代码中使用频率非常高,以至于不得不好奇其内部实现。本文将基于在正式分析channel的实现之前,我们先看下channel的最基本用法,代码如下:
channel是Golang中一个非常重要的特性,也是Golang CSP并发模型的一个重要体现。简单来说就是,goroutine之间可以通过channel进行通信。
channel在Golang如此重要,在代码中使用频率非常高,以至于不得不好奇其内部实现。本文将基于 go 1.13的源码 ,分析channel的内部实现原理。
channel的基本使用
在正式分析channel的实现之前,我们先看下channel的最基本用法,代码如下:
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
c <- 1 // send to channel
}()
x := <-c // recv from channel
fmt.Println(x)
}
在以上代码中,我们通过 make(chan int) 来创建了一个类型为int的channel。
在一个goroutine中使用 c <- 1 将数据发送到channel中。在主goroutine中通过 x := <- c 从channel中读取数据并赋值给x。
以上代码对应了channel的两种基本操作:send操作 c <- 1 和 recv操作 x := <- c , 分别表示发送数据到channel和从channel中接收数据。
此外,channel还分为有缓存channel和无缓存channel。上述代码中,我们使用的是无缓冲的channel。对于无缓冲的channel,如果当前没有其他goroutine正在接收channel数据,则发送方会阻塞在发送语句处。
我们可以在channel初始化时指定缓冲区大小,例如, make(chan int, 2) 则指定缓冲区大小为2。在缓冲区未满之前,发送方无阻塞地可以往channel发送数据,无需等待接收方准备好。而如果缓冲区已满,则发送方依然会阻塞。
channel对应的底层实现函数
在探究channel源码之前,我们至少需要先找到channel在Golang的具体实现在哪。因为我们在使用channel时,用的是 <- 符号,并不能直接在 go 源码中找到其实现。但是Golang的编译器必然会将 <- 符号翻译成底层对应的实现。
我们可以使用Go自带的命令: go tool compile -N -l -S hello.go , 将代码翻译成对应的汇编指令。
或者,直接可以使用 Compiler Explorer 这个在线工具。对于上述示例代码可以直接在这个链接看其汇编结果: go.godbolt.org/z/3xw5Cj
通过仔细查看以上示例代码对应的汇编指令,可以发现以下的对应关系:
- channel的构造语句
make(chan int), 对应的是runtime.makechan函数 - 发送语句
c <- 1, 对应的是runtime.chansend1函数 - 接收语句
x := <- c, 对应的是runtime.chanrecv1函数
以上几个函数的实现都位于go源码中的 runtime/chan.go 代码文件中。我们接下来针对这几个函数,探究下channel的实现。
channel的构造
channel的构造语句 make(chan int) ,将会被golang编译器翻译为 runtime.makechan 函数, 其函数签名如下:
func makechan(t *chantype, size int) *hchan
其中, t *chantype 即构造channel时传入的元素类型。 size int 即用户指定的channel缓冲区大小,不指定则为0。该函数的返回值是 *hchan 。hchan则是channel在golang中的内部实现。其定义如下:
type hchan struct {
qcount uint // buffer中已放入的元素个数
dataqsiz uint // 用户构造channel时指定的buf大小
buf unsafe.Pointer // buffer
elemsize uint16 // buffer中每个元素的大小
closed uint32 // channel是否关闭,== 0代表未closed
elemtype *_type // channel元素的类型信息
sendx uint // buffer中已发送的索引位置 send index
recvx uint // buffer中已接收的索引位置 receive index
recvq waitq // 等待接收的goroutine list of recv waiters
sendq waitq // 等待发送的goroutine list of send waiters
lock mutex
}
hchan中的所有属性大致可以分为三类:
- buffer相关的属性。例如buf、dataqsiz、qcount等。 当channel的缓冲区大小不为0时,buffer中存放了待接收的数据。使用ring buffer实现。
- waitq相关的属性,可以理解为是一个FIFO的标准队列。其中recvq中是正在等待接收数据的goroutine,sendq中是等待发送数据的goroutine。waitq使用双向链表实现。
- 其他属性,例如lock、elemtype、closed等。
makechan 的整个过程基本都是一些合法性检测和对 buffer 、 hchan 等属性的内存分配,此处不再进行深入讨论了,有兴趣的可以直接看此处的源码。
通过简单分析hchan的属性,我们可以知道其中有两个重要的组件, buffer 和 waitq 。 hchan 所有行为和实现都是围绕这两个组件进行的。
向channel中发送数据
channel的发送和接收流程很相似,我们先分析下channel的发送过程(如 c <- 1 ), 对应于 runtime.chansend 函数的实现。
在尝试向channel中发送数据时,如果 recvq 队列不为空,则首先会从 recvq 中头部取出一个等待接收数据的goroutine出来。并将数据直接发送给该goroutine。代码如下:
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
recvq中是正在等待接收数据的goroutine。当某个goroutine使用recv操作(例如, x := <- c ),如果此时channel的缓存中没有数据,且没有其他goroutine正在等待发送数据(即 sendq 为空),会将该goroutine以及要接收的数据地址打包成 sudog 对象,并放入到recvq中。
继续接着讲上面的代码,如果此时 recvq 不为空,则调用send函数将数据拷贝到对应的goroutine的堆栈上。
send函数的实现主要包含两点:
memmove(dst, src, t.size) goready(gp, skip+1)
而如果 recvq 队列为空,则说明此时没有等待接收数据的goroutine,那么此时channel会尝试把数据放到缓存中。代码如下:
if c.qcount < c.dataqsiz {
// 相当于 c.buf[c.sendx]
qp := chanbuf(c, c.sendx)
// 将数据拷贝到buffer中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
以上代码的作用其实非常简单,就是把数据放到buffer中而已。此过程涉及了ring buffer的操作,其中 dataqsiz 代表用户指定的channel的buffer大小,如果不指定则默认为0。其他具体的详细操作后续过程会在ring buffer一节详细讲到。
如果用户使用的是无缓冲channel或者此时buffer已满,则 c.qcount < c.dataqsiz 条件不会满足, 以上流程也并不会执行到。此时会将当前的goroutine以及要发送的数据放入到 sendq 队列中,同时会切出该goroutine。整个流程对应代码如下:
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 将goroutine转入waiting状态,并解锁
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
以上代码中,goparkunlock就是解锁传入的mutex,并切出该goroutine,将该goroutine置为waiting状态。 gopark 和上面的 goready 对应,互为逆操作。 gopark 和 goready 在runtime的源码中会经常遇到,涉及了goroutine的调度过程,这里就不再深入讨论,以后会单独写一篇文章讲解。
调用 gopark 后,对于用户侧来看,该向channel发送数据的代码语句会进行阻塞。
以上过程就是channel的发送语句(如, c <- 1 )的内部工作流程,同时整个发送过程都使用 c.lock 进行加锁,保证并发安全。
简单来说,整个流程如下:
- 检查recvq是否为空,如果不为空,则从recvq头部取一个goroutine,将数据发送过去,并唤醒对应的goroutine即可。
- 如果recvq为空,则将数据放入到buffer中。
- 如果buffer已满,则将要发送的数据和当前goroutine打包成
sudog对象放入到sendq中。并将当前goroutine置为waiting状态。
从channel中接收数据的过程基本与发送过程类似,此处不再赘述了。具体接收过程涉及到的buffer的相关操作,会在后面进行详细的讲解。
这里需要注意的是,channel的整个发送过程和接收过程都使用 runtime.mutex 进行加锁。 runtime.mutex 是runtime相关源码中常用到的一个轻量级锁。整个过程并不是最高效的lockfree的做法。golang在这里有个issue: go/issues#8899 ,给出了lockfree的channel的方案。
channel的ring buffer实现
channel中使用了ring buffer(环形缓冲区)来缓存写入的数据。ring buffer有很多好处,而且非常适合用来实现FIFO式的固定长度队列。
在channel中,ring buffer的实现如下:
hchan 中有两个与buffer相关的变量: recvx 和 sendx 。其中 sendx 表示buffer中可写的index, recvx 表示buffer中可读的index。 从 recvx 到 sendx 之间的元素,表示已正常存放入buffer中的数据。
我们可以直接使用 buf[recvx] 来读取到队列的第一个元素,使用 buf[sendx] = x 来将元素放到队尾。
buffer的写入
当buffer未满时,将数据放入到buffer中的操作如下:
qp := chanbuf(c, c.sendx)
// 将数据拷贝到buffer中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
其中 chanbuf(c, c.sendx) 相当于 c.buf[c.sendx] 。以上过程非常简单,就是将数据拷贝到buffer的 sendx 的位置上。
接着,将 sendx 移到下一个位置上。如果 sendx 已到达最后一位,则将其置为0,这是一个典型的头尾相连的做法。
buffer的读取
当buffer未满时,此时 sendq 里面也一定是空的(因为如果buffer未满,用于发送数据的goroutine肯定不会排队,而是直接放数据到buffer中,具体逻辑参考上文向channel发送数据一节),这时候对于channel的读取过程 chanrecv 就比较简单了,直接从buffer中读取即可,也是一个移动 recvx 的过程。与上文buffer的写入基本一致。
而 sendq 里面有已等待的goroutine的时候,此时buffer一定是满的。这个时候channel的读取逻辑如下:
// 相当于c.buf[c.recvx]
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
以上代码中, ep 接收数据的变量对应的地址。例如,在 x := <- c 中,表示变量 x 的地址。
而 sg 代表从sendq中取出的第一个 sudog 。并且:
-
typedmemmove(c.elemtype, ep, qp)表示buffer中的当前可读元素拷贝到接收变量的地址处。 -
typedmemmove(c.elemtype, qp, sg.elem)表示将sendq中goroutine等待发送的数据拷贝到buffer中。因为此后进行了recv++, 因此相当于把sendq中的数据放到了队尾。
简单来说,这里channel将buffer中队首的数据拷贝给了对应的接收变量,同时将sendq中的元素拷贝到了队尾,这样可以才可以做到数据的FIFO(先入先出)。
接下来可能有点绕, c.sendx = c.recvx , 这句话实际的作用相当于 c.sendx = (c.sendx+1) % c.dataqsiz ,因为此时buffer依然是满的,所以 sendx == recvx 是成立的。
总结
channel作为golang中最常用设施,了解其源码可以帮助我们更好的理解和使用。同时也不会过于迷信和依赖channel的性能,channel就目前的设计来说也还有更多的优化空间。
参考
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【Java集合源码剖析】ArrayList源码剖析
- Java集合源码剖析:TreeMap源码剖析
- 我的源码阅读之路:redux源码剖析
- ThreadLocal源码深度剖析
- SharedPreferences源码剖析
- Volley源码剖析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Beginning PHP and MySQL 5
W. Jason Gilmore / Apress / 2006-01-23 / USD 44.99
Beginning PHP and MySQL 5: From Novice to Professional, Second Edition offers comprehensive information about two of the most prominent open source technologies on the planet: the PHP scripting langua......一起来看看 《Beginning PHP and MySQL 5》 这本书的介绍吧!