GO并发模型:Pipeline和Cancellation(未完)

栏目: Go · 发布时间: 5年前

内容简介:原文地址:Go并发模型可以非常容易地构建出能高效利用IO和多CPU的数据pipeline。这篇文章将以此为重点来介绍,同时也会涉及到一些在实践中易犯的错误及其对应的解决之法。在GO中,pipeline无明确定义;它是语言提供的一种并发编程方式,由连接各个chanel而形成的一系列阶段组成。在其各个阶段,可能分别运行着很多由同一函数产生的goroutine。这些goroutine
GO并发模型:Pipeline和Cancellation(未完)

image.png

原文地址: https://blog.golang.org/pipelines

介绍

Go并发模型可以非常容易地构建出能高效利用IO和多CPU的数据pipeline。这篇文章将以此为重点来介绍,同时也会涉及到一些在实践中易犯的错误及其对应的解决之法。

什么是Pipeline

GO 中,pipeline无明确定义;它是语言提供的一种并发编程方式,由连接各个chanel而形成的一系列阶段组成。在其各个阶段,可能分别运行着很多由同一函数产生的goroutine。这些goroutine

  • 从输入channel接收数据
  • 对数据作相应处理,例如在此基础上产生新数据
  • 再通过输出channel把数据发送出去

除了开始和结束,每个阶段都会包含任意多个输入和输出channel。开始阶段只有输出channel,结束阶段只有输入channel。相应地,开始阶段可被称为生产者,结束阶段可被称为消费者。

我们先通过一个简单的例子来说明。

并发计算平方数

首先来举一个三阶段pipeline的例子

第一阶段,创建输入参数为可变长int整数的 gen 函数,它通过goroutine发送所有输入参数,并在发送完成后关闭相应channel:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    
    reutrn out
}

第二阶段,sq函数,负责从输入channel中接收数据并作平方处理再发送到输出channel中。在输入channel关闭并把所有数据都成功发送至输出channel,关闭输出channel:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()

    return out
}

主函数 main 中创建了pipeline,并执行了最后阶段的任务,从管道中接收了第二阶段的数据并打印了出来:

func main() {
    // Set up the pipeline
    c := gen(2, 3)
    out := sql(c)

    // Consume the output
    fmt.Println(<-out)
    fmt.Println(<-out)
}

此处 sq 函数的输入和输出参数为相同类型的channel,因此我们可以对其进行组合。重写 main 函数,如下:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n)
    }
}

此处相等于在pipeline中增加了一个阶段,即涉及到了三个阶段,其中2、3阶段的goroutine由同一函数产生。

Fan-out和Fan-in (扇出和扇入)

多个函数可同时从同一个channel中读取数据,直到channel关闭,称为 fan-out 。这为我们提供了一种将任务分发给多个worker的途径,从而实现CPU和I/O的高效利用。

通过多路复用技术将多个channel合并到单个channel实现从多个输入读取数据的能力,只有当所有的输入都关闭,才会停止数据的读取。这个称作 fan-in

重写之前的 main ,我们调用两次 sq ,且两次都从同一个channel中读取数据。我们将引入一个新的函数,通过fan-in方式获取数据:

func main() {
    in := gen(2, 3)
    
    // Distribute the sq work across two goroutines that both read from in
    c1 := sq(in)
    c2 := sq(in)
    
    // Comsume the merged output from c1 and c2
    for n := range merge(c1, c2) {
        fmt.Println(n)  // 4 then 9, or 9 then 4
    }
}

merge函数通过为每个输入channel启动一个goroutine实现将数据发送同一个channel中,从完成将channel列表转化为单个channel的功能。一旦所有的输出channel(生产者)启动, merge 就会启动一或多个goroutine接收所有数据并在结束后关闭对应channel。

在已关闭的channel发送数据会导致panic,因此保证关闭channel前所有数据都发送完毕是非常重要的。 sync.WaitGroup 为我们提供了一种实现该同步的方式。示例如下:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        
        wg.Done()
    }
    wg.Add(len(cs)
    
    for _, c := range(cs) {
        go output(c)
    }
    
    // Start a goroutine to close out once all the output goroutines are
    // done. This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

第三次浪潮

第三次浪潮

[美]阿尔文·托夫勒 / 黄明坚 / 中信出版集团 / 2018-7 / 79.00元

《第三次浪潮》是美国著名未来学家阿尔文•托夫勒的代表作之一。1980年出版之际,随即引起全球热评,堪称中国改革开放的指南。本书阐述了由科学技术发展引起的社会各方面的变化与趋势。托夫勒认为,人类迄今为止已经经历了两次浪潮文明的洗礼:第一次是农业革命,人类就此从原始渔猎时代进入以农业为基础的文明社会,并历经千年,直到工业革命的到来。随后,人类社会历时300年摧毁了落后的第一次浪潮文明,并在“二战”后1......一起来看看 《第三次浪潮》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换