内容简介:通道(channel) 是Go实现CSP并发模型的关键, 鼓励用通信来实现数据共享。 Dont' communicate by sharing memory, share memory by communicating.
简介(js)
通道(channel) 是 Go 实现CSP并发模型的关键, 鼓励用通信来实现数据共享。 Dont' communicate by sharing memory, share memory by communicating.
CSP
: Communicating Sequential Process
创建
chan.go
中 hchan的结构
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex } type waitq struct { first *sudog last *sudog }
makechan
: 这里先做了一些元素大小,队列大小检查。受垃圾回收器的限制,如果包含指针类型,则缓冲槽需单独分配内存,否则可一次性分配,调整buf的指针,最后设置size等属性
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { //限制chan的元素大小 throw("makechan: invalid channel element type") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) //检查是否溢出 if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) }
收发
这里用sudog用来保存收发队列,其中包含一个元素和g的指针,这里也实现了cache,central那一套缓存体系.
用 acquireSudog
获取sudog和 releaseSudog
释放sudog, 大致流程也是先从本地p获取,接着再去sched.sudogcache中获取.
type sudog struct { g *g elem unsafe.Pointer // data element (may point to stack) } type p struct { sudogcache []*sudog sudogbuf [128]*sudog } type schedt struct { // Central cache of sudog structs. sudoglock mutex sudogcache *sudog }
发送
在go1.13的源码中已经不判断 c.dataqsiz==0
, 也就是将缓冲长度的0的大于0的整合在一起了。
如果 block=false
: 如果通道为nil, 则直接返回false. 对于无缓冲的情况,如果没有接收者会直接return false。 如果有缓冲但是缓冲满了也会return false。
如果通道关闭了,会触发panic。
尝试等待队列 c.recvq
中有等待者的话, 就直接将数据复制到sg.elem(如果是带缓冲的则更新缓冲的index等参数),并唤醒对应的groutine。
如果没有等待者,并且缓冲队列能存下,则获取一个sudog之后将数据放入 sendq
并返回
如果缓冲队列存不下,则调用 goparkunlock
然当前goroutine休眠,直到被 goready
唤醒,然后释放当前的sudog
// entry point for c <- x from compiled code //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) } func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } }
接收
接收类似,但是在通道关闭并且缓冲中无数据时,会返回一个默认值。
故而在通道关闭之后还是能获取到一个值. 但是此时的返回中 received
变成了false
注意: 可能是由于如果队列满的话,可以直接将那块地址的数据做swap,才将有数据分为队列满不满的两种.在看select的时候判断条件有点让人不好理解.
在recv函数中, sg是sender,go 在这边的处理流程是sg := c.sendq.dequeue()
,先从sendq中取出一个,如果sg不为nil,则调用 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
在recv()中,如果 c.dataqsiz>0
,也就是带缓冲chan,将调用 typedmemmove(c.elemtype, ep, qp)
把queue的数据复制给 ep
,然后调用 typedmemmove(c.elemtype, qp, sg.elem)
将sg.elem(也就是pop出来的sender)的数据复制给qp
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } }
if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
关闭
buf
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } c.closed = 1 // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
select
在go1.13的源码中, runtime/select.go
中已经没有newselect方法了,
select的处理移到了 src/cmd/compile/internal/gc/select.go
中. 大概看看注释就好了,不然就涉及到编译的过程了
在编译的时候,会遍历所有的节点,生成节点树,这是如果是 OSELECT
的话,则会调用 walkselect
,
walkselectcases
中。对Node这个对象就不研究了
mkcall("block", nil, &ln) selectgo
// The result of walkstmt MUST be assigned back to n, e.g. // n.Left = walkstmt(n.Left) func walkstmt(n *Node) *Node { case OSELECT: walkselect(n) } func walkselect(sel *Node) { } func walkselectcases(cases *Nodes) []*Node { if n == 0 { return []*Node{mkcall("block", nil, nil)} } // optimization: one-case select: single op. // TODO(rsc): Reenable optimization once order.go can handle it. // golang.org/issue/7672. if n == 1 {} // convert case value arguments to addresses. // this rewrite is used by both the general code and the next optimization. for _, cas := range cases.Slice() {} // optimization: two-case select but one is default: single non-blocking op. if n == 2 && (cases.First().Left == nil || cases.Second().Left == nil) {} // generate sel-struct selv := temp(types.NewArray(scasetype(), int64(n))) order := temp(types.NewArray(types.Types[TUINT16], 2*int64(n))) // register cases for i, cas := range cases.Slice() { setField("kind", nodintconst(kind)) if c != nil { c = convnop(c, types.Types[TUNSAFEPTR]) setField("c", c) } if elem != nil { elem = convnop(elem, types.Types[TUNSAFEPTR]) setField("elem", elem) } if instrumenting { r = mkcall("selectsetpc", nil, nil, bytePtrToIndex(selv, int64(i))) init = append(init, r) } fn := syslook("selectgo") r.Rlist.Set1(mkcall1(fn, fn.Type.Results(), nil, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), nodintconst(int64(n)))) }
selectgo
就是go总select语句的实现了
- 转类型成scases,pollorder,lockorder三个数组
-
将nil channel的scase统一成scase{},也就是
caseNil
类型方便处理 -
遍历case, 用
fastrandn
随机生成一个j,交换i,j的数据放到交换后的pollorder
数组中 - 根据hchan的地址获得locking order(锁的顺序),使用简单堆 排序 来保证nlogn时间和常熟堆栈足迹
- 设置锁,将所有的chan锁住
-
开始遍历选择
- 第一轮,按照pollorder,查找是否有已经在等待的,如果未找到,则看是否有caseDefault,有的话执行默认,然后返回. 这里对通道的检查, 如果所有的数据都堵塞(进不去,或者出不来) 则进入第二轮
-
第二轮,将所有的chan都入队列。 caseRecv入c.recvq,caseSend入sendq,将当前G休眠等待被某一个chan唤醒(
selparkcommit
会将unlock所有chan) -
第三轮, 轮训所有的case,将原先入队的数据全部dequeue,从queue中移除,并返回casei, 也就是获取到数据的case位置,
然后判断,cas是不是nil, 因为有可能是close(chan)事件唤醒的,这时就需要再次loop,当然如果还是判断到closed的这个case, 这里就会返回默认值然后退出。
这里比较重要的一个是:
- 如果chan是nil,则分支永远走不到。 如果chan是closed,那么只要轮到(由于算法的随机,可能有别的chan先走到)肯定都能进去
type scase struct { c *hchan // chan elem unsafe.Pointer // data element kind uint16 pc uintptr // race pc (for race detector / msan) releasetime int64 } // selectgo implements the select statement. // // cas0 points to an array of type [ncases]scase, and order0 points to // an array of type [2*ncases]uint16. Both reside on the goroutine's // stack (regardless of any escaping in selectgo). func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) { // 将cas0和order0都转为数组 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) //转为slice,并拆分为pollorder和lockorder scases := cas1[:ncases:ncases] pollorder := order1[:ncases:ncases] lockorder := order1[ncases:][:ncases:ncases] // 遍历,将所有chan为nil的都改为scase{} // Replace send/receive cases involving nil channels with // caseNil so logic below can assume non-nil channel. for i := range scases { cas := &scases[i] if cas.c == nil && cas.kind != caseDefault { *cas = scase{} } } // generate permuted order for i := 1; i < ncases; i++ { j := fastrandn(uint32(i + 1)) pollorder[i] = pollorder[j] pollorder[j] = uint16(i) } // lock all the channels involved in the select sellock(scases, lockorder) loop: // pass 1 - look for something already waiting // pass 2 - enqueue on all chans // wait for someone to wake us up // pass 3 - dequeue from unsuccessful chans selunlock(scases, lockorder) goto retc }
其他
这里想到一个竞争的问题,也就是select阻塞时入了所有的chan列表,当多个chan都去唤醒时怎么保证这个竞争问题
ready这个函数中如果一个协程已经不是Gwaiting状态,再次设置则会报错.
解决的关键就在于 selectDone
这个参数
在 dequeue
函数中, sgp.g.selectDone这个参数是原子性的,在入队时将其isSelect参数设置为true.
通过这个判断,和对selectDone改为1的过程中,如果改失败了则会跳过这个g,继续选择, 在select的处理逻辑中,当该协程唤醒后,会将select中的chan全部退回,这样就不会出现问题了。
// Mark runnable. _g_ := getg() mp := acquirem() // disable preemption because it can be holding p in a local var if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") }
func (q *waitq) dequeue() *sudog { if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) { continue } }
以上所述就是小编给大家介绍的《golang 源码剖析(6): 通道》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【Java集合源码剖析】ArrayList源码剖析
- Java集合源码剖析:TreeMap源码剖析
- 我的源码阅读之路:redux源码剖析
- ThreadLocal源码深度剖析
- SharedPreferences源码剖析
- Volley源码剖析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
创新者的窘境(全新修订版)
克莱顿•克里斯坦森 / 胡建桥 / 中信出版社 / 2014-1-1 / 48.00元
全球商业领域中,许多企业曾叱咤风云,但面对市场变化及新技术的挑战,最终惨遭淘汰。究其原因,竟然是因为它们精于管理,信奉客户至上等传统商业观念。这就是所有企业如今都正面临的“创新者的窘境”。 在《创新者的窘境》中,管理大师克里斯坦森指出,一些看似很完美的商业动作——对主流客户所需、赢利能力最强的产品进行精准投资和技术研发——最终却很可能毁掉一家优秀的企业。他分析了计算机、汽车、钢铁等多个行业的......一起来看看 《创新者的窘境(全新修订版)》 这本书的介绍吧!