内容简介:条件变量(conditional variable),和互斥锁一样,也是一个同步工具。我们常常会把条件变量与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。使用条件变量的最大优势就是在效率方面的提升。当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了。
条件变量
条件变量(conditional variable),和互斥锁一样,也是一个同步工具。我们常常会把条件变量与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。
作用
条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。
使用条件变量的最大优势就是在效率方面的提升。当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了。
使用条件变量
条件变量需要与互斥锁配合使用。条件变量的初始化需要互斥锁,并且它的方法有的也是基于互斥锁的。
条件变量提供的方法有三个:
- 等待通知(wait)
- 单发通知(signal)
- 广播通知(broadcast)
在利用条件变量等待通知的时候,需要在它基于的那个互斥锁的保护下进行。
在进行单发通知或光爆通知的时候,需要在对应的互斥锁解锁之后再做操作。
创建条件变量结合代码理解上面的含义,先创建几个变量:
var lock sync.RWMutex sendCond := sync.NewCond(&lock) recvCond := sync.NewCond(lock.RLocker())
条件变量的类型lock是一个读写锁,基于这把锁,创建了2个代表条件变量的变量,这两个变量的类型是*sync.Cond,是由sync.NewCond函数来初始化的。
与互斥锁锁不同,这里不是开箱即用的,只能使用sync.NewCond函数来创建它的指针值,这个函数需要一个sync.Locker类型的参数。
前面说过,条件变量是基于互斥锁的,它必须有互斥锁的支持才能够起作用。因此,这里的参数是必须的,它也会参与到条件变量的方法实现中去。
sync.Locker接口sync.Locker其实是一个接口,包含两个方法Lock()和Unlock():
type Locker interface { Lock() Unlock() }
sync.Mutex类型sync,RWMutex类型都拥有这两个方法,不过都是指针方法。因此这两个类型的指针类型才是sync.Locker接口的实现类型。
在为sendCond初始化的时候,把lock变量的指针作为参数。这里lock变量的Lock方法和Unlock方法分别用于对其中写锁的锁定和解锁。这里与实现接口的两个方法的名称是对应的。
在为recvCond初始化的时候,需要的是lock变量的读锁,并且还得是sync.Locker接口类型,就是要实现了Lock和Unlock方法的读锁。可是lock变量中用于读锁的方法却是RLock方法和RUnlock方法,这里名称不对应了。不过有一个RLocker方法可以实现这一需求,下面是源码里实现的部分,很简单:
// RLocker returns a Locker interface that implements // the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. func (rw *RWMutex) RLocker() Locker { return (*rlocker)(rw) } type rlocker RWMutex func (r *rlocker) Lock() { (*RWMutex)(r).RLock() } func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
这里我有一些小疑惑,3个方法里面都是类型断言吧。RLocker方法把原来的读写锁类型转成一个新的类型然后返回。后面的两个方法,为了用新类型调用读写锁类型里的方法,先进行类型断言,转成读写锁原本的类型,然后调用它的方法。
使用条件变量下面是截取的使用时的部分代码:
lock.Lock() for !isEmpty { sendCond.Wait() } isEmpty = false // 这里可以做写入的操作 lock.Unlock() recvCond.Signal()
上面是一个写入的流程。之前的代码定义了一个状态变量isEmpty,只有状态为空的时候,才允许写入,写入后把状态设置为非空。
这里要先调用Lock方法,等待通知(wait)是要在互斥锁的保护下进行的。
然后再操作完之后,先调用Unlock方法,再发送通知,发送通知的操作要在互斥锁解锁之后。
这里等待的出sendCond的信号,而最后发送的是recvCond的信号。在另一个读取的流程里则正好相反。利用条件变量可以实现单向的通知,而这里要实现双向的通知,就需要两个条件变量。这是条件变量的基本使用原则。
示例代码
上面把关键的代码分析了一下,下面是完整的示例代码:
package main import ( "fmt" "sync" "time" "flag" ) var useCond bool func init() { flag.BoolVar(&useCond, "cond", false, "是否使用条件变量") } type msgBox struct { message string isEmpty bool sendCond *sync.Cond recvCond *sync.Cond } func main() { flag.Parse() fmt.Println("是否开启了条件变量保护:", useCond) var lock sync.RWMutex msgBox := msgBox{ isEmpty: true, // 默认值是false,状态初始值应该为true sendCond: sync.NewCond(&lock), // 不是开箱即用的,需要在使用前初始化 recvCond: sync.NewCond(lock.RLocker()), } done := make(chan struct{}) max := 5 // 写操作的goroutine go func(max int) { defer func() { done <- struct{}{} }() for i := 0; i < max; i++ { time.Sleep(time.Millisecond * 200) // 先进行保护 lock.Lock() // 再等待通知 for useCond && !msgBox.isEmpty { msgBox.sendCond.Wait() } msgBox.isEmpty = false msg := fmt.Sprintf("第 %d 条消息", i) msgBox.message = msg fmt.Printf("发送消息[%d]: %s\n", i, msg) // 先解锁 lock.Unlock() // 再发送通知 msgBox.recvCond.Signal() } }(max) // 读操作的goroutine go func(max int) { defer func() { done <- struct{}{} }() for j := 0; j < max; j++ { time.Sleep(time.Millisecond * 500) lock.RLock() for useCond && msgBox.isEmpty { msgBox.recvCond.Wait() } msgBox.isEmpty = true msg := msgBox.message fmt.Printf("接收消息[%d]: %s\n", j, msg) lock.RUnlock() msgBox.sendCond.Signal() } }(max) <-done <-done fmt.Println("Over") }
在这个例子里,写的时候要获取到写锁,读的时候要获取到读锁,这个逻辑和之前互斥锁是一样的。但是只是获取到锁还不能做操作,这里还要再做一个限制,所以就用到了条件变量。
在这个例子里,写操作和读操作是需要成对出现的。写完一次之后,依然能获取到写锁,但是不能立刻写。而是要等待读操作把之前写入的数据读过之后,才能再次写入,把之前的内容覆盖掉。读操作也是一样。这里就需要两个goroutine之间传递信号了。
通过命令行参数分别在开启/关闭条件变量的环境下运行,可以看到其中的作用:
go run main.go go run main.go -cond
Wait方法
条件变量的Wait方法主要做了4件事:
- 把调用它的goroutine加入到当前条件变量的通知队列中
- 解锁当前的条件变量基于的那个互斥锁
- 让当前的goroutine处于等待状态,等到通知来了再决定是否唤醒它。此时阻塞在调用Wait方法的那行代码上
- 如果通知来了并且决定唤醒当前goroutine,就在唤醒它之后重新锁定当前条件变量基于的互斥锁
先解锁,在阻塞在Wait方法里,必须要先解锁,在阻塞当前goroutine。否则就违背了互斥锁要成对出现的原则。并且当前goroutine在解锁千就阻塞的话,当前goroutine就不可能在执行解锁了。即使不考虑原则,让别的goroutine来解锁,又会有重复解锁可能。
使用for语句并且Wait方法建议是放在一个for循环里的。这里似乎也是可以用if语句的。但是if语句只能检查状态一次,而for的话可以进行多次检查。如果goroutine收到了通知而唤醒,但是此时检查时发现状态还是不对,那么就应该再次调用Wait方法。保险起见,在包裹条件变量的Wait方法总是应该使用for语句。
Signal方法和Broadcast方法
这2个方法都是用来发送通知的。Signal方法的通知只会唤醒一个goroutine,而Broadcast方法的通知会唤醒所有等待的goroutine。Wait方法会把当前的goroutine添加到通知队列的队尾,而Signal方法会从通知队列的队首开始查找可以被唤醒的goroutine。因此Signal方法唤醒的一般是最早等待的那个goroutine。
适用场景这2个方法的行为决定他们的适用场景。确定只有一个goroutine在等待通知,或者值需要唤醒一个goroutine的时候,就使用Signal方法。否则,使用Broadcast方法总是没错的,Broadcast方法的适用场景更多。
通知的即时性
条件变量的通知具有即时性。如果发送通知的时候没有goroutine在等待,那么该次通知就会被直接丢弃。之后再开始等待的goroutine需要等待之后的通知。
示例代码2
还是前面那个示例,稍微改了改,把读写锁换成了互斥锁,通知方法把Signal换成了Broadcast:
package main import ( "fmt" "sync" "time" ) var lock sync.Mutex // 匿名结构体,定义并初始化赋值 // 嵌入式锁(Embedded lock)的场景适合使用匿名结构体 var msgBox = struct { message string isEmpty bool sendCond *sync.Cond recvCond *sync.Cond }{ isEmpty: true, sendCond: sync.NewCond(&lock), recvCond: sync.NewCond(&lock), } // 用于设置消息的函数 func send(id, index int) { lock.Lock() for !msgBox.isEmpty { msgBox.sendCond.Wait() } msg := fmt.Sprintf("msg: [%d-%d]", id, index) msgBox.message = msg fmt.Printf("发送消息[%d-%d]: %s\t", id, index, msg) msgBox.isEmpty = false lock.Unlock() msgBox.recvCond.Broadcast() } // 用于读取消息的函数 func recv(id, index int) { lock.Lock() for msgBox.isEmpty { msgBox.recvCond.Wait() } msg := msgBox.message msgBox.message = "" fmt.Printf("接收消息[%d-%d]: %s\n", id, index, msg) msgBox.isEmpty = true lock.Unlock() msgBox.sendCond.Broadcast() } func main() { done := make(chan struct{}) count := 5 // 启动一个goroutine用于发送 go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 100) send(id, i) } }(0, count * 2) // 启动两个goroutine用于接收 go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 300) recv(id, i) } }(1, count) go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 400) recv(id, i) } }(2, count) <- done <- done <- done fmt.Println("Over") }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- C++多线程中的锁和条件变量使用
- C++11多线程-条件变量(std::condition_variable)
- SQL where条件和jion on条件的详解及区别
- Python 条件语句
- React 行内条件渲染
- Golang: 条件和循环
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Weaving the Web
Tim Berners-Lee / Harper Paperbacks / 2000-11-01 / USD 15.00
Named one of the greatest minds of the 20th century by Time , Tim Berners-Lee is responsible for one of that century's most important advancements: the world wide web. Now, this low-profile genius-wh......一起来看看 《Weaving the Web》 这本书的介绍吧!