从并发模型看 Go 的语言设计

栏目: Go · 发布时间: 5年前

内容简介:传统的程序语言设计都不会将输入输出作为语言的核心,但 Tony Hoare 认为输入输出是基本的编程原语,且通信顺序进程(Communicating sequential processes,CSP)的并行组合(这里可能用「并发」会更为准确)是基本的程序组织方法。Go 语言的并发设计就是基于 CSP 模型的。在最初的 CSP 模型中,程序总由若干个可以相互通信的进程构成,其中每一个进程内部是顺序执行的(这也就是 CSP 名称的含义)。注意这里的「进程」并不一定指操作系统中的进程,也不一定是操作系统的线程,它

传统的程序语言设计都不会将输入输出作为语言的核心,但 Tony Hoare 认为输入输出是基本的编程原语,且通信顺序进程(Communicating sequential processes,CSP)的并行组合(这里可能用「并发」会更为准确)是基本的程序组织方法。Go 语言的并发设计就是基于 CSP 模型的。

在最初的 CSP 模型中,程序总由若干个可以相互通信的进程构成,其中每一个进程内部是顺序执行的(这也就是 CSP 名称的含义)。注意这里的「进程」并不一定指操作系统中的进程,也不一定是操作系统的线程,它是一个抽象的概念,代表一组计算的序列,例如 goroutine 这种在应用层调度的计算序列也算 CSP 中的「P」。与 Go 语言不同的地方在于,这个最初的设计中并没有通信管道的概念,每个进程是直接和另一个进程进行通信的,但在 Go 语言中,goroutine 是匿名的,一个 goroutine 并没有办法直接将消息发给另一个 goroutine,为了实现 goroutine 之间的通信,Go 语言提供了 first class 的 channel,消息通过 channel 来从一个 goroutine 发到另一个 goroutine。而且,Go 语言也不要求 goroutine 内部是顺序执行的,goroutine 内部可以创建更多的 goroutine,并发地完成工作。

下面,我们通过例子说明基于 CSP 模型是如何组织程序的。

阶乘计算

首先来看的是一个计算阶乘的例子,阶乘的一个简单的递归实现可以是这样的:

fact 0 = 1
fact n = n * fact (n - 1)

而基于 CSP 组织程序,我们可以这样做:

// 阶乘计算的实体
func FactCalc(in <-chan int, out chan<- int) {
 var subIn, subOut chan int
 for {
 n := <-in
 if n == 0 {
 out <- 1
 } else {
 if subIn == nil {
 subIn, subOut = make(chan int), make(chan int)
 go FactCalc(subIn, subOut)
 }
 subIn <- n - 1
 r := <-subOut
 out <- n * r
 }
 }
}
// 包装一个阶乘计算函数
func MakeFactFunc() func(int) int {
 in, out := make(chan int), make(chan int)
 go FactCalc(in, out)
 return func(x int) int {
 in <- x
 return <-out
 }
}

MakeFactFunc 就是简单地封装一下 FactCalc,获取一个计算阶乘的函数。主要的计算是由 FactCalc 进行的。

每一个 FactCalc 都会被作为一个独立的 goroutine 来执行,对于第 i 个 goroutine 而言,它先从第 i - 1 个 goroutine 中读入一个数字 n,然后,如果 n > 0,这个 goroutine 需要做 3 件事:

1. 向第 i + 1 个 goroutine 写入一个 n - 1

2. 从第 i + 1个 goroutine 处读回来一个数字 r

3. 将 n * r 写入第 i - 1 个 goroutine

否则,则向第 i - 1 个 goroutine 处写入一个 1。

如前所述,由于 Go 语言不支持直接向一个 goroutine 发消息,所以这里的消息收发都要基于 channel 进行。我们可以看到,一旦 FactCalc 发现自己无法完成阶乘问题的计算工作,它就会创建另一个 goroutine(只会创建一次),并将子问题发送给这个 goroutine 进行处理,这会形成一个 FactCalc goroutine 的链条,链条上的每一个 goroutine 都与前一个和后一个 goroutine 进行通信(这就是前文所说的「若干个可以相互通信的进程」)。

我们又了这样的阶乘计算器后,我们可以这么去使用它:

const limit = 5
func main() {
 fact := MakeFactFunc()
 for i := 0; i < limit; i++ {
 fmt.Println(fact(i))
 }
}

执行程序,我们可以看到这样的输出:

相比于直接使用递归函数调用,这个实现方式非常不直观。下面这个图可能能帮助理解:

从并发模型看 Go 的语言设计

这里的圆形为调用者,每一个矩形都为一个 goroutine,当我们尝试计算 fact(3) 时,会将 3 写入最前面的 in channel 中,数据开始从第一个 goroutine 向后流动。第一个 goroutine 会从这个 channel 中读到这个 3,它将 3 - 1 写入下一个 in channel 中,然后开始阻塞等待 out channel 出现第二个 goroutine 计算的结果,第二个、第三个 goroutine 的计算是类似的,等到第 4 个 goroutine 从 in channel 中读取输入时,它发现这是一个 0,于是直接向 out channel 写入一个 1,此时数据开始从最后一个 goroutine 往回流动,经过第三个和第二个 goroutine 的计算后,第一个 goroutine 会获得 2,然后将 2 * 3 输出。

注意到这里进行阶乘计算的实体并不是递归的函数,而是并发的 goroutine,它们之间通过 channel 进行通信, 每个 goroutine 都将计算拆分并发送给其他 goroutine 进行处理,直到计算变为 trivial 的情况。当然了,这个实现相比简单的递归函数会显得很啰嗦,我们在实际使用中也不会这么做,但这个例子说明了如何在 CSP 模型下,利用数据的流动实现我们常见的递归。

素数筛

下面的一个例子中,我们使用筛法来计算素数。所谓素数筛,大概就是对正整数 2 ~ n 进行遍历,然后对每一个数字都进行一次筛选,只留下是素数的部分,对于第 i 位的筛选,我们需要依赖前面已经晒出的 m 个素数,当且仅当这 m 个素数都无法整除第 i 位的数字时,这个数字可以通过这一位的筛选,也就是这样:

primes = primesFilter [2..]
 where primesFilter (p:xs) =
 p : primesFilter [x | x <- xs, x `mod` p /= 0]

上面这个实现利用了 Haskell 的惰性求值能力,但对于大多数语言而言,我们的实现都不可能这么简洁,基于传统的顺序计算的思路,程序都会比较啰嗦,而且关键是很不清晰。而在 CSP 模型下,我们可以这么实现:

func Counter(out chan<- int) {
 for i := 2; ; i++ {
 out <- i
 }
}
func PrimeFilter(prime int, in <-chan int, out chan<- int) {
 for {
 i := <-in
 if i%prime != 0 {
 out <- i
 }
 }
}
func PrimeSieve(out chan<- int) {
 c := make(chan int)
 go Counter(c)
 for {
 prime := <-c
 out <- prime
 newC := make(chan int)
 go PrimeFilter(prime, c, newC)
 c = newC
 }
}

可以看到,我们的素数筛由三个部分组成,首先,Counter 从 2 开始依次产生自然数。PrimeFilter 就是每一层素数的过滤器,每一层过滤器只持有一个输入 channel 一个输出 channel 和一个素数 prime,它将不断从输入 channel 中读入数字,并将其中无法被 prime整除的部分输出。PrimeSieve 则是一个完整的素数筛,它每获得一个素数,都将素数输出,并创建一个新一层的过滤器,因此整个过程大概是这样的:

从并发模型看 Go 的语言设计

PrimeSieve 可以向 out channel 中依次输出被筛出来的素数,这个过程是惰性的,直到我们从 out channel 中取出素数,下一个素数才会被计算。我们可以这样去使用它:

func main() {
 primes := make(chan int)
 go PrimeSieve(primes)
 for i := 0; i < 5; i++ {
 fmt.Println(<-primes)
 }
}

执行程序,我们可以看到这样的输出:

从这两个例子中我们可以看到 CSP 模型不一定是用于并行计算,至少在这两个例子中,每一个 goroutine 在进行计算之后都在阻塞等待,同一时间事实上仅有一个活跃的 goroutine,但 Go 语言对 CSP 并发模型的支持能让整个设计变得简单清晰(「并发」和「并行」的区别可以参考这个视频)。这反映到 Go 语言设计上的要点有两个:

下面我们再通过另外一个例子看一下 Go 语言的其他设计点。

信号量

一个信号量有两个操作,分别称为 V(signal())与P(wait())。其运作方式如下:

在 CSP 模型下,我们可以这样实现:

type Semaphore struct {
 inc chan struct{}
 dec chan struct{}
}
func (sem *Semaphore) Wait() {
 sem.dec <- struct{}{}
}
func (sem *Semaphore) Signal() {
 sem.inc <- struct{}{}
}
func MakeSemaphore(initVal int) *Semaphore {
 sem := Semaphore{
 inc: make(chan struct{}),
 dec: make(chan struct{}),
 }
 go func(s int) {
 for {
 if s > 0 {
 select {
 case <-sem.inc:
 s = s + 1
 case <-sem.dec:
 s = s - 1
 }
 } else {
 <-sem.inc
 s = s + 1
 }
 }
 }(initVal)
 return &sem
}

*Semaphore 有两个操作,分别是 Wait 和 Signal,它们分别向 dec channel 和 incchannel 发消息。而 MakeSemaphore 中创建的 goroutine 则会根据 s int 状态的不同选择不同的操作,如果 s > 0,则从 inc channel 或 dec channel 中随机读取一个值,并将 s 的值进行增加/减少 1,否则,从 inc channel 中读取一个值,并将 s 的值增加 1。注意这里的「随机」是非常重要的,如果 inc 和 dec 同时都有数据可读,则实际从哪个 channel 中读出数据是不确定的,正是因为 Go 语言的 select 是随机的,我们才可以在这里用它来进行调度。显然,在大多数语言中,如果要实现 channel 这样的类型,一般是以库的形式进行实现,而 Go 语言将其上升到了语言层面实现,这样虽然显的不够纯粹干净,但这样可以通过更方便的语法实现 select 这样强大的功能,如果实现为库的形式,是难以做到这个程度的。

一个简单的服务模板

在这个例子中,我们将基于 Go 语言实现一个极简单的服务模板,代码如下:

type Input struct {
 Req interface{}
 State interface{}
}
type Output struct {
 Rsp interface{}
 State interface{}
}
type Handler = func(input Input) (Output, error)
type Response struct {
 Result interface{}
 Error error
}
type InMessage struct {
 Req interface{}
 OutChan chan<- Response
}
type Service struct {
 inChan chan<- InMessage
}
func (service *Service) RpcCall(request interface{}) (interface{}, error) {
 outChan := make(chan Response)
 service.inChan <- InMessage{request, outChan}
 rsp := <-outChan
 if rsp.Error != nil {
 return nil, rsp.Error
 }
 return rsp.Result, nil
}
func MakeService(handler Handler, initState interface{}) *Service {
 inChan := make(chan InMessage)
 go func(state interface{}) {
 for {
 in := <-inChan
 out, err := handler(Input{in.Req, state})
 if err != nil {
 in.OutChan <- Response{nil, err}
 } else {
 state = out.State
 in.OutChan <- Response{out.Rsp, nil}
 }
 }
 }(initState)
 return &Service{inChan}
}

这里的 Service 是一个服务模板,我们通过 MakeService 来创建它。在创建服务模板的时候,我们要求调用者传入一个请求处理函数 handler func(input Input) (Output, error),从类型可以知道,它接受一个请求,然后进行处理,并返回响应。请求和响应中都带有状态,handler 可以借此保存和修改状态,由于模板并不知道状态是什么,因此,MakeService 还要求调用者传入一个初始的状态 initState。然后,MakeService 会启动一个 goroutine,这个 goroutine 不断从 inChan 读入请求,并调用 handler 进行处理,最终将响应通过 outChan 发回给调用方。RpcCall 简单封装了一下从 inChan 输入请求,从 outChan 读取响应的过程。我们可以使用这个模板这样实现一个简单的电话本服务:

type Query struct {
 Name string
}
type Insert struct {
 Name string
 Phone int
}
type PhoneBookService = Service
func (s *PhoneBookService) Insert(name string, phone int) {
 s.RpcCall(Insert{name, phone})
}
func (s *PhoneBookService) Query(name string) (int, error) {
 phone, err := s.RpcCall(Query{"Tom"})
 if err != nil {
 return 0, err
 }
 return phone.(int), nil
}
func MakePhoneBookService() *PhoneBookService {
 return MakeService(func(i Input) (Output, error) {
 st := i.State.(map[string]int)
 switch req := i.Req.(type) {
 case Query:
 x, ok := st[req.Name]
 if !ok {
 return Output{nil, nil}, fmt.Errorf("%v no found", req.Name)
 }
 return Output{x, st}, nil
 case Insert:
 st[req.Name] = req.Phone
 return Output{nil, st}, nil
 default:
 return Output{nil, nil}, fmt.Errorf("unknonw input: %v", req)
 }
 }, make(map[string]int))
}
func main() {
 service := MakePhoneBookService()
 phone, err := service.Query("Tom")
 if err != nil {
 fmt.Println("query err:", err)
 } else {
 fmt.Println("query succ:", phone)
 }
 service.Insert("Tom", 123456)
 phone, err = service.Query("Tom")
 if err != nil {
 fmt.Println("query err:", err)
 } else {
 fmt.Println("query succ:", phone)
 }
}

这个电话本功能很简单,只有 Insert 和 Query 两种方法。Service 模板的作用是将整个 Go 语言的并发模型封装在函数调用内,从 PhoneBookService 的实现中,我们可以发现,这里没有任何 goroutine 的产生代码,也没有 channel 的使用,仅仅出现了简单的函数调用。对于 handler 的实现,里面也是一个简单的循环。这样一来,具体服务的实现者就不需要接触 Go 语言的并发模型,也可以实现简单的服务了。

执行程序,我们可以看到如下的输出:

query err: Tom no found
query succ: 123456

在这里,我们可以注意到 Go 语言的另外两点设计,一个是使用错误返回值的错误的处理方式,另一个是只有接口没有泛型。

首先说错误处理。

Go 语言的错误处理方式有很大争议,支持者认为,Go 的错误返回值方式让错误的出现更加明确,不会扰乱让开发者的逻辑,更清晰地表达了意图。而反对者则认为异常抛出的缺失导致 Go 代码的错误处理非常冗长,且频繁打断主要逻辑。显然,这两个观点都有各自的道理,且在不同的语言里我们也看到了这两种错误处理方式的广泛应用,但是我认为在 Go 的并发模型的限制下,使用错误返回值的方式是一个合理正确的选择。如前所述,Go 语言每当创建一个 goroutine 之后,这个 goroutine 就和创建者没有什么关系了,它甚至不能像线程一样直接被等待执行结束。goroutine 和 goroutine 唯一进行关联的方式就是通过 channel 的消息传递。假设 Go 语言支持了抛出异常,那么,一个 goroutine 中抛出了一个没有被捕获的异常,这个异常将会导致什么呢?由于没有任何实体有责任捕获并处理这个异常,因此这里唯一正确的处理方式就是 panic 了,这个处理显然是很不可靠的,一个 goroutine 中的异常导致整个系统的 panic 无法让人接受。当然,有人会说,那在每个 goroutine 的最顶层都 try-catch 一下就可以了。那问题又来了,try-catch 之后呢?如果出现了一个已经被抛到顶层的异常,说明这个异常应该无法被这个 goroutine 自身处理了,应该交由其监视者来处理,例如上面的例子中,调用者就应该负责处理 Service goroutine 中产生的错误。那么,在这个时候,唯一正确的做法就是将抛出的异常以错误值的形式通过 channel 发送给监视者,以期待上层能够正确处理这个异常。那么这样一来,开发者就必须频繁混合使用两种错误处理方式,这样的开发方式是极其混乱且易错的。所以,使用错误返回值的方式应该是更加合理统一的方式了。

第二点,关于泛型的问题。

Go 语言只有接口没有泛型,这导致了很多麻烦,例如我们无法实现带有静态检查的自定义容器,泛型算法也难以实现。许多 Go 语言的开发者对于泛型的看法是:你不需要这个。我承认在实际工程中泛型的使用场合远少于接口,但是,即便从服务开发这个 Go 语言的主战场来看,泛型的必要性也依然很高。从上面的例子中我们可以看到,代码中大量充斥着 interface{} 和对 interface{} 的类型转换。其原因就是我们在实现这样一个服务模板时,我们并不知道模板的使用者需要处理怎样的 request,返回怎样的 response,也不知道这里的 state 是什么。由于泛型的缺失,我们的代码相当于失去了静态的类型检查,将静态的类型错误变为了运行时错误,这样一来,Go 语言的静态能力就缺失了很多,甚至我们可以说,Go 语言泛型的缺失使得 Go 语言在类型安全性上不如带有泛型能力的静态类型语言,却比这些语言在使用上还要更啰嗦(各种类型转换和错误判断)。

总结

Go 语言是一个原生支持并发的语言,其并发模型基于 CSP 模型。通过使用 Go 语言的并发能力,我们可以设计出非常直观易懂的代码。经过上面几个例子的分析中我们可以看出,从并发模型和并发程序设计的角度来看,Go 在语言设计上的优势在于:

而 Go 在语言设计上的劣势在于:

泛型的缺失导致许多程序设计变得脆弱,增加代码量且失去了安全性


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

A Byte of Python

A Byte of Python

Swaroop C H / Lulu Marketplace / 2008-10-1 / USD 27.98

'A Byte of Python' is a book on programming using the Python language. It serves as a tutorial or guide to the Python language for a beginner audience. If all you know about computers is how to save t......一起来看看 《A Byte of Python》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具