内容简介:我们创建了一大堆线程, 现在我们想要实现线程间的同步, 这其中的关键就是chan(通道)的使用, 如果没有通道, 你应该怎么去做线程间同步呢? time.Sleep吗?缓冲槽的工作方式就是上图那样, 每当你往通道里写消息, 消息会先存到缓冲槽里, 而后才被取出来. 这种常规的工作模式也叫异步模式, 因为收发工作不是同步进行的, 你可以先发, 发完你走人, 随后收件人再去管道里取.同样还有一种用法是不设置缓冲槽, 或者说你直接把缓冲槽大小设置成0, 这种工作模式下, 收发双方必须同时守在管道旁, 否则先到的人
我们创建了一大堆线程, 现在我们想要实现线程间的同步, 这其中的关键就是chan(通道)的使用, 如果没有通道, 你应该怎么去做线程间同步呢? time.Sleep吗?
Introduction
type hchan struct { dataq_size uint // 缓冲槽大小 buf unsafe.Pointer // 缓冲槽本体 elem_type *_type // 槽内数据类型 } 复制代码
缓冲槽的工作方式就是上图那样, 每当你往通道里写消息, 消息会先存到缓冲槽里, 而后才被取出来. 这种常规的工作模式也叫异步模式, 因为收发工作不是同步进行的, 你可以先发, 发完你走人, 随后收件人再去管道里取.
同样还有一种用法是不设置缓冲槽, 或者说你直接把缓冲槽大小设置成0, 这种工作模式下, 收发双方必须同时守在管道旁, 否则先到的人一定会堵塞在管道哪儿, 等待后到的人, 然后通信才能开始, 这种也叫同步模式
仔细想一想通道给我们带来了什么, 如果将通道的功能点拆开, 通道的核心功能点就是: 可以在两个不同的G之间相互发消息, 同时还有一套阻塞/唤醒的机制.
通道的工作模式
首先分析一个关键点, 你需要知道有哪些G正守着这个通道, 然后把管道里的参数拷贝给这些堵塞着的G, 最后去解除他们的阻 . 有了这个前提条件, 通道工作围绕的对象就一定是G,
type hchan struct { recv_q waitq // 接收者队列 send_q waitq // 发送者队列 } type waitq struct { first *sudog last *sudog } type sudog struct { g *g // 想要收发消息的g本体 elem unsafe.Pointer // 要发的消息本体 } 复制代码
每个通道都会维护一个发送者队里以及一个接收者队列, 队列里的元素是一个包装过的G(G本体+消息). 我们通过一个发送与接收的过程, 先说说同步通道是如何工作的, 异步通道于此类似
收发 - 同步模式
chan <- data
的操作会被编译器翻译成去执行 chansend
函数, 执行的对象是名为 chan
的通道, 携带一个消息结构体. 检查chan的缓冲槽长度为0, 进入通道的同步模式工作:
- 找一个接收者, 检查chan的接收者队列
- chan.recv_q为空, 按照同步队列的工作模式, 因为没人接消息, 这个G必须阻塞(沉睡), 你这个G, 连同你的消息, 一起被打包成一个sudog, 放到chan的发送者队列里
- 然后通过gopark把你放入沉睡模式
- 但如果有接收者, 取出这个sudog对象, 然后把这条消息拷贝到sudog.elem里面, 这代表我这条消息已经发给你了.
- 最后因为这个接收者之前因为没收到消息一直在沉睡中, 通过goready唤醒
ok以上说明了几件事, 通道是如何知道消息应该发给谁, 消息是怎么发送过去的, 以及我们看到的阻塞效果是怎么实现的.
仔细想想, 这阻塞? 在某种条件达到以后自动解除阻塞? 这个场景好像在哪里见过? 你小子在暗示 sync.WaitGroup
!! 等 wg.Count
变成0了之后自动解除阻塞, wg使用的阻塞效果也正是通过gopark实现的! 这种沉睡/阻塞最大的特点就是, 某个G被放入沉睡以后, 必须由你手动唤醒, 在我们的场景中这个条件就是找到了接收方, 在wg的场景中这个条件就是wg.Count变成0了, 条件一命中, 我手动立刻帮你唤醒并解除阻塞.
收发 - 异步模式
想象一下异步模式与同步模式的区别在哪? 唯一的区别, 仅仅是在管道填满了才会产生堵塞, 不然你发完/收完就走人.
剩下来原理基本一样, chan <- data
操作同样被翻译成 chansend
函数的调用, 发现缓冲槽大小不为0以后进入异步工作模式, 开始检查缓冲槽的剩余舱位
- 如果还有剩余舱位, 将消息通过memmove拷贝到管道内, 然后如果发现还有接收者正在堵塞中, 通过goready唤醒
- 如果没有剩余舱位, 通过gopark进入休眠模式, 在被唤醒以后检查有没有数据
关闭
到了这儿你已经对这一套工作模式非常了解了, 所谓的关闭其实就是遍历通道的发送者队列+接收者队列, 在他们的数据区发送一条nil消息, 然后执行goready唤醒他们中的每一个人
Select的工作模式
经常与通道一起出现的就是Select, Select的功能只是: 从所有的通道case中随机挑一个能用的, 否则就一直堵塞直到出现一个能用的. 我们解析一下这种特性是怎么做到的.
随机序
type hselect struct { ncases uint16 // 总数 poll_order *uint16 // 随机序号 cases []scase // 按照初始化顺序的case队列 } type scase struct { c *hchan // case的本体, 一个通道 kind uint16 // 通道类型 } 复制代码
一个select在初始化的时候, 然后把所有的通道从chan类型包装成 scase
类型, 添加上一个字段叫做Kind,这个字段可以是"接收者通道"/"发送者通道", 最后还有一个"default"类型通道, 表明这是一个default case.
然后会生成一个随机序号存到poll_order字段中去, 这代表一个随机数, 然后等程序运行到select的位置的时候, 调用 select_go
函数, 开始找可以用的通道:
for i,_ := range [0...ncases] { random_case_id := poll_order[i] random_chanel := cases[random_id] if check(random_chanel) { return } } if default_case != nil { execute(default_case) return } 复制代码
我们按照以上的方法去执行随机序, 在所有的case都遍历完了以后, 如果没用能用的, 检查有没有能用的default用
沉睡的select
我们已经知道通道的沉睡与唤醒是怎么实现的, 针对select有意思的一点是, 如果子通道被唤醒, 则自己这个selectG也同时被唤醒了. 这点很神奇, 怎么做到的
for i,_ := range [0...ncases] { random_case_id := poll_order[i] random_chanel := cases[random_id] if random_chanel.kind == recv_chanel { random_chanel.recvq.append(selG) } if random_chanel.kind == send_chanel { random_chanel.sendq.append(selG) } } gopark(selectG) 复制代码
同样的, 我们也是遍历select下的所有通道, 把自己添加到通道的消息队列中去
- 如果是它是发送类型通道, 那就在它的发送者队列中添加自己这个selectG
- 如果是它是接收类型通道, 那就在它的接收者队列中添加自己这个selectG
想一想这样做会发生什么, 自己这个SelectG协程会同时出现在很多通道的消息队列里, 其中任何一个通道被 goready
唤醒的时候, 自己这个SelectG也会被通知到, 自己也会跟这个通道一起被唤醒. 这就实现了select会一直堵塞直到其中任何一个通道畅通为止的特性
欢迎关注我们的微信公众号,每天学习 Go 知识
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web容量规划的艺术
阿尔斯帕瓦 / 叶飞、罗江华 / 机械工业出版社 / 2010-1 / 29.00元
《Web容量规划的艺术》由John Allspaw(F订ickr的工程运营经理)撰写,结合了他个人在F1ickr成长过程中的许多经历和很多其他产业中同行的洞察力。在衡量增长、预测趋势、成本效益等方面,他们的经验都会给你一些可靠并有效的指导。 网站的成功是以使用和增长来衡量的,而且网站类公司的成败(生死)是依赖于他们是否有能力来衡量决定他们的基础结构,从而适应不断增长的需求。作者通过自身实践给......一起来看看 《Web容量规划的艺术》 这本书的介绍吧!