内容简介:原文地址:Go并发模型可以非常容易地构建出能高效利用IO和多CPU的数据pipeline。这篇文章将以此为重点来介绍,同时也会涉及到一些在实践中易犯的错误及其对应的解决之法。在GO中,pipeline无明确定义;它是语言提供的一种并发编程方式,由连接各个chanel而形成的一系列阶段组成。在其各个阶段,可能分别运行着很多由同一函数产生的goroutine。这些goroutine
image.png
原文地址: https://blog.golang.org/pipelines
介绍
Go并发模型可以非常容易地构建出能高效利用IO和多CPU的数据pipeline。这篇文章将以此为重点来介绍,同时也会涉及到一些在实践中易犯的错误及其对应的解决之法。
什么是Pipeline
在 GO 中,pipeline无明确定义;它是语言提供的一种并发编程方式,由连接各个chanel而形成的一系列阶段组成。在其各个阶段,可能分别运行着很多由同一函数产生的goroutine。这些goroutine
- 从输入channel接收数据
- 对数据作相应处理,例如在此基础上产生新数据
- 再通过输出channel把数据发送出去
除了开始和结束,每个阶段都会包含任意多个输入和输出channel。开始阶段只有输出channel,结束阶段只有输入channel。相应地,开始阶段可被称为生产者,结束阶段可被称为消费者。
我们先通过一个简单的例子来说明。
并发计算平方数
首先来举一个三阶段pipeline的例子
第一阶段,创建输入参数为可变长int整数的 gen
函数,它通过goroutine发送所有输入参数,并在发送完成后关闭相应channel:
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() reutrn out }
第二阶段,sq函数,负责从输入channel中接收数据并作平方处理再发送到输出channel中。在输入channel关闭并把所有数据都成功发送至输出channel,关闭输出channel:
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
主函数 main
中创建了pipeline,并执行了最后阶段的任务,从管道中接收了第二阶段的数据并打印了出来:
func main() { // Set up the pipeline c := gen(2, 3) out := sql(c) // Consume the output fmt.Println(<-out) fmt.Println(<-out) }
此处 sq
函数的输入和输出参数为相同类型的channel,因此我们可以对其进行组合。重写 main
函数,如下:
func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) } }
此处相等于在pipeline中增加了一个阶段,即涉及到了三个阶段,其中2、3阶段的goroutine由同一函数产生。
Fan-out和Fan-in (扇出和扇入)
多个函数可同时从同一个channel中读取数据,直到channel关闭,称为 fan-out
。这为我们提供了一种将任务分发给多个worker的途径,从而实现CPU和I/O的高效利用。
通过多路复用技术将多个channel合并到单个channel实现从多个输入读取数据的能力,只有当所有的输入都关闭,才会停止数据的读取。这个称作 fan-in
。
重写之前的 main
,我们调用两次 sq
,且两次都从同一个channel中读取数据。我们将引入一个新的函数,通过fan-in方式获取数据:
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in c1 := sq(in) c2 := sq(in) // Comsume the merged output from c1 and c2 for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 } }
merge函数通过为每个输入channel启动一个goroutine实现将数据发送同一个channel中,从完成将channel列表转化为单个channel的功能。一旦所有的输出channel(生产者)启动, merge
就会启动一或多个goroutine接收所有数据并在结束后关闭对应channel。
在已关闭的channel发送数据会导致panic,因此保证关闭channel前所有数据都发送完毕是非常重要的。 sync.WaitGroup
为我们提供了一种实现该同步的方式。示例如下:
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs) for _, c := range(cs) { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Golang并发模型:并发协程的优雅退出
- go语言大并发(一)----goroutine与并发模型
- Golang并发模型:轻松入门流水线模型
- 并发编程:内存模型
- Golang CSP并发模型
- Java并发 -- Actor模型
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Intersectional Internet
Safiya Umoja Noble、Brendesha M. Tynes / Peter Lang Publishing / 2016
From race, sex, class, and culture, the multidisciplinary field of Internet studies needs theoretical and methodological approaches that allow us to question the organization of social relations that ......一起来看看 《The Intersectional Internet》 这本书的介绍吧!