内容简介:golang 中 chan 的源码在 src/runtime/chan.go 文件中,
golang 中 chan 的源码在 src/runtime/chan.go 文件中, hchan
则为 chan 的结构体
hchan:
type hchan struct { qcount uint // 当前缓存数据的总量 dataqsiz uint // 缓存数据的容量 buf unsafe.Pointer // 缓存数据,为一个循环数组,容量大小为 dataqsiz,当前大小为 qcount elemsize uint16 // 数据类型的大小,比如 int 为 4 closed uint32 // 标记是否关闭 elemtype *_type // 数据的类型 sendx uint // 发送队列 sendq 的长度 recvx uint // 接收队列 recvq 的长度 recvq waitq // 阻塞的接收 goroutine 的队列 sendq waitq // 阻塞的发送 goroutine 的队列 lock mutex // 锁,用于并发控制队列操作 } ``` `waitq:` ```go type waitq struct { first *sudog last *sudog }
waitq 为双向链表,sudog 代表一个封装的 goroutine,其参数 g 为 goroutine 实例结,构如下图:
02. 新建 chan:
在 go 中,通过如下代码创建 chan
c := make(chan int, 4)
以上代码,对应的是源码:
func makechan(t *chantype, size int) *hchan
逻辑流程如下:
graph TD A[makechan] -->|t, size| B{安全检查} B -->|N| ZR[ERROR] B -->|Y| E{size 或 t.elem.size 是否为0?} E -->|Y| F[mallocgc 默认大小 hchanSize 内存] E -->|N| G{数据类型是否为指针?} G -->|Y| H[通过new单独分配chan内存] G -->|N| I[mallocgc 内存 hchanSize + mem] H --> Z F --> Z I --> Z Z[chan 赋值属性] Z --> ZB[END]
func makechan(t *chantype, size int) *hchan { elem := t.elem // 安全检查,数据项大小不超过 16K if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 获取要分配的内存 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // size 为 0 的情况,分配 hchan 结构体大小的内存,64位系统为 96 Byte. c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // 数据项不为指针类型,调用 mallocgc 一次性分配内存大小,hchan 结构体大小 + 数据总量大小 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 数据项为指针类型,hchan 和 buf 分开分配内存,GC 中指针类型判断 reachable and unreadchable. c = new(hchan) c.buf = mallocgc(mem, elem, true) } // chan 赋值属性, 数据项大小、数据项类型、缓存数据的容量 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c }
03.读写chan
在 go 中,写入 chan 的代码如下:
v := 1 c := make(chan int) c <- v
读取 chan 的代码如下:
var v int c := make(chan int) c -> v
c <- v
操作对应的源码为 runtime 中的
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
而 c -> v
操作对应源码为 runtime 中的
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
其中 c 为 chansend 的 c, v 的地址为 chansend 的 ep.
逻辑流程如下:
graph TD A[chansend 或 chanrecv] -->|hchan, ep| B{校验} B --> |Y| C[加锁 lock] B --> |N| D["gopark(), 阻塞当前 goroutine 和 throw error"] C --> E{chan close?} E --> |Y| F[unlock和panic] E --> |N| G[取出 recvq 或 sendq 队列] G --> H{是否等待的sudog?} H --> |Y| I["send()或recv()"] I --> J["goready(), 运行 sudog 的 goroutine"] H --> |N| K{存在剩余缓冲区?} K --> |Y| L["数据放入缓冲区 buf, unlock"] K --> |N| M["打包成sudog,加入sendq或recvq队列"] M --> O["gopark(),阻塞当前goroutine等待被接受者唤醒"] O -."如果被唤醒,说明数据已经被接收,回收sudog".-> P["保存context,运行别的 goroutine"] P --> Z L --> Z Z[End]
由于发送和接收的逻辑都是差不多的,所以这里就直接放上发送的逻辑代码来分析就好了
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 校验 if c == nil { if !block { return false } // 参数异常,block == true, 进行阻塞 goroutine. gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 加锁,并发读写控制 lock(&c.lock) // 查看 chan 是否关闭 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 从等待接收列队 recvq 中试图获取获取封装的 goroutine sudog. if sg := c.recvq.dequeue(); sg != nil { // 找到等待接收 chan 的 goroutine sudog,直接发送 value 给接收者,并通过 goready() 唤醒接受者 goroutine send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 查看查看缓存空间是否 buf 是否还有剩余 if c.qcount < c.dataqsiz { // 将数据移动到 qp 中并放入 chan 缓存,sendx++ qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } // chan 如果为非阻塞,unlock 后直接返回 if !block { unlock(&c.lock) return false } // 将当前 goroutine 封装 sudog,并放入到等待发送队列 sendq 中 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // 阻塞当前 goroutine,等待被接受者 chanrecv() 的唤醒 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // KeepAlive 方法,由于 GC 的缘故,而调用 KeepAlive(ep) // goroutine 被唤醒,重置 gorotuine 状态 和 sudog if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }
代码的部分详解:
gopark
:M(工作线程) 会保存 goroutine 的上下文,而调度器会让当前工作线程线程 M 绑定执行其他的 goroutine.
KeepAlive(ep)
: 由于 GC 的机制,当 ep 不再被上下文引用的时候,GC 会主动回收 eq,导致 buf 被回收,所以调用 KeepAlive,告诉 GC 不需要对 eq 变量进行内存回收,具体可以查看 runtime.SetFinalizer 方法部分有详细介绍.
唤醒
:goroutine 会在 chanrecv 这个 chan 接收接收函数中,从 hchan.sendq 被取出,执行 goready(), 通知调度器去唤醒,然后放入 P(逻辑处理器) 的执行等待队列中,等待被下一次调用.
send()/recv()
: 通过 memmove()
的方式从发送方 拷贝
buf 到接收方.
下面我们通过一个使用channel做生产/消费的模型来试图分解一下 chan 的步骤:
func main(){ //初始化任务队列 channel ch := make(chan Data, 4) //生产者往channel丢数据 for _, task := range { ch <- task } //初始化消费者 for i := 0; i< ConsumerNum; i++ { go consumer(ch) } ... } // 消费者 func consumer(ch chan Data){ for { //收取任务并处理 data := <- ch process(data) } }
从 main 函数开始,golang 就会开启一个 goroutine 来执行代码,我们可以将其记作生产者 G’p, 代码中 go consumer
标记 consumer 函数也开启一个 goroutine 来进行,我们记其为消费者 G’c.
-
初始化任务队列channel
此时会在堆区域分配一块内存,用于存储 hchan 结构体和 buf 的缓存数据。hchan.buf指向一个大小为4的数组,并且hchan.sendx、hchan.recvx置0,hchan.dataqsiz置4。
-
生产者往channel丢数据
G’p 往 ch 发送数据的时候,会执行 lock(&hchan.lock) 对 buf 加锁,把要发送的数据拷贝到 buf 里,hchan.sendx++,之后 unlock(hchan.lock) 释放锁。
-
消费者执行消费行为
G’c 从 ch 中获取数据的时候,会执行 lock(&hchan.lock) 对 buf 加锁,将 buf 里面的一条数据拷贝到接收变量 data 对应的空间中,hchan.recvx++,之后释放锁。
0.4 关闭 chan
在 go 中,关闭 chan 的代码如下:
ch := make(chan int ,10) close(ch)
close(ch)
对应的runtime的函数:
func closechan(c *hchan)
逻辑流程如下:
graph TD A[closechan] --c--> B{检查} B --> |Y| C[加锁 lock] B --> |N| D[panic] C --> E{chan是否已close} E --> |Y| D E --> |N| F[置 hchan.close = 1] F --> G[释放recvq的所有等待接收者] G --> H[释放sendq的所有等待发送者] H -.发送者会panic.-> D H --> I[unlock] I --> J[唤醒recvq和sendq的所有goroutine] J --> End
func closechan(c *hchan) { // 检查,chan 是否为空 if c == nil { panic(plainError("close of nil channel")) } // 加锁,防止资源竞争 lock(&c.lock) // chan 如果已关闭,则 panic if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } // 置 hchan.close = 1, 标记已关闭 c.closed = 1 var glist gList // 释放recvq的所有等待接收者 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // 释放sendq的所有等待发送者 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // 解锁,unlock unlock(&c.lock) // 唤醒recvq和sendq的所有goroutine for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
close 的主要作用是用于唤醒所有监测 chann 的 goroutine,但是要注意的是:
- 如果 sendq 的缓冲区还有发送者,这些发送者都会 panic
- 如果两次 close chan,会导致 panic
0.5 关于 chan 的面试问题
-
chan 如何处理并发读写问题
hchan 结构体中通过锁
lock mutex
参数进行对公共缓存资源 buf 的控制达到并发读写的 race 问题. -
如果往 chan 发送数据,size 满了,或者往 chan 获取数据,buf 空。这会导致阻塞,此时runtime的行为是怎么样的呢?
由于两者逻辑一样,我们就直接讲往 chan 发送数据,size 满了的情况.
如果往 chan 发送数据,size 满了,此时 goroutine 和 buf 会被打包成 sudog,通过 gopark 将 goroutine 状态置为等待, 同时把 sudog 放入 hchan.sendq 等待发送队列中,等待接收者接收并调用 goready() 重新调度 goroutine. 此时 goroutine 被阻塞后,M(工作线程) 会与 goroutine 解绑,通过 P(逻辑处理器) 重新进行调度,M 与新的 goroutine 重新绑定执行.
感悟
还是有一部分以目前的知识还是无法看懂,以后慢慢积累后再回来补坑,或大佬们可以帮我指出一下,谢谢.
以上所述就是小编给大家介绍的《Golang 源码导读 —— channel》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Golang 源码导读 —— channel
- Golang 源码导读 —— chann
- WebRTC Native 源码导读(十四):API 概览
- WebRTC Native 源码导读(十三):音频设备模块 ADM
- Golang 源码导读 Vitess-vttablet 模块(一): 主程序解析
- 架构集成导读
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Sass and Compass in Action
Wynn Netherland、Nathan Weizenbaum、Chris Eppstein、Brandon Mathis / Manning Publications / 2013-8-2 / USD 44.99
Written by Sass and Compass creators * Complete Sass language reference * Covers prominent Compass community plug-ins * Innovative approach to creating stylesheets Cascading Style Sheets paint the we......一起来看看 《Sass and Compass in Action》 这本书的介绍吧!