内容简介:条件变量(conditional variable),和互斥锁一样,也是一个同步工具。我们常常会把条件变量与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。使用条件变量的最大优势就是在效率方面的提升。当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了。
条件变量
条件变量(conditional variable),和互斥锁一样,也是一个同步工具。我们常常会把条件变量与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。
作用
条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。
使用条件变量的最大优势就是在效率方面的提升。当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复的做检查了,只要等待通知就好了。
使用条件变量
条件变量需要与互斥锁配合使用。条件变量的初始化需要互斥锁,并且它的方法有的也是基于互斥锁的。
条件变量提供的方法有三个:
- 等待通知(wait)
- 单发通知(signal)
- 广播通知(broadcast)
在利用条件变量等待通知的时候,需要在它基于的那个互斥锁的保护下进行。
在进行单发通知或光爆通知的时候,需要在对应的互斥锁解锁之后再做操作。
创建条件变量结合代码理解上面的含义,先创建几个变量:
var lock sync.RWMutex sendCond := sync.NewCond(&lock) recvCond := sync.NewCond(lock.RLocker())
条件变量的类型lock是一个读写锁,基于这把锁,创建了2个代表条件变量的变量,这两个变量的类型是*sync.Cond,是由sync.NewCond函数来初始化的。
与互斥锁锁不同,这里不是开箱即用的,只能使用sync.NewCond函数来创建它的指针值,这个函数需要一个sync.Locker类型的参数。
前面说过,条件变量是基于互斥锁的,它必须有互斥锁的支持才能够起作用。因此,这里的参数是必须的,它也会参与到条件变量的方法实现中去。
sync.Locker接口sync.Locker其实是一个接口,包含两个方法Lock()和Unlock():
type Locker interface {
Lock()
Unlock()
}
sync.Mutex类型sync,RWMutex类型都拥有这两个方法,不过都是指针方法。因此这两个类型的指针类型才是sync.Locker接口的实现类型。
在为sendCond初始化的时候,把lock变量的指针作为参数。这里lock变量的Lock方法和Unlock方法分别用于对其中写锁的锁定和解锁。这里与实现接口的两个方法的名称是对应的。
在为recvCond初始化的时候,需要的是lock变量的读锁,并且还得是sync.Locker接口类型,就是要实现了Lock和Unlock方法的读锁。可是lock变量中用于读锁的方法却是RLock方法和RUnlock方法,这里名称不对应了。不过有一个RLocker方法可以实现这一需求,下面是源码里实现的部分,很简单:
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
这里我有一些小疑惑,3个方法里面都是类型断言吧。RLocker方法把原来的读写锁类型转成一个新的类型然后返回。后面的两个方法,为了用新类型调用读写锁类型里的方法,先进行类型断言,转成读写锁原本的类型,然后调用它的方法。
使用条件变量下面是截取的使用时的部分代码:
lock.Lock()
for !isEmpty {
sendCond.Wait()
}
isEmpty = false
// 这里可以做写入的操作
lock.Unlock()
recvCond.Signal()
上面是一个写入的流程。之前的代码定义了一个状态变量isEmpty,只有状态为空的时候,才允许写入,写入后把状态设置为非空。
这里要先调用Lock方法,等待通知(wait)是要在互斥锁的保护下进行的。
然后再操作完之后,先调用Unlock方法,再发送通知,发送通知的操作要在互斥锁解锁之后。
这里等待的出sendCond的信号,而最后发送的是recvCond的信号。在另一个读取的流程里则正好相反。利用条件变量可以实现单向的通知,而这里要实现双向的通知,就需要两个条件变量。这是条件变量的基本使用原则。
示例代码
上面把关键的代码分析了一下,下面是完整的示例代码:
package main
import (
"fmt"
"sync"
"time"
"flag"
)
var useCond bool
func init() {
flag.BoolVar(&useCond, "cond", false, "是否使用条件变量")
}
type msgBox struct {
message string
isEmpty bool
sendCond *sync.Cond
recvCond *sync.Cond
}
func main() {
flag.Parse()
fmt.Println("是否开启了条件变量保护:", useCond)
var lock sync.RWMutex
msgBox := msgBox{
isEmpty: true, // 默认值是false,状态初始值应该为true
sendCond: sync.NewCond(&lock), // 不是开箱即用的,需要在使用前初始化
recvCond: sync.NewCond(lock.RLocker()),
}
done := make(chan struct{})
max := 5
// 写操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < max; i++ {
time.Sleep(time.Millisecond * 200)
// 先进行保护
lock.Lock()
// 再等待通知
for useCond && !msgBox.isEmpty {
msgBox.sendCond.Wait()
}
msgBox.isEmpty = false
msg := fmt.Sprintf("第 %d 条消息", i)
msgBox.message = msg
fmt.Printf("发送消息[%d]: %s\n", i, msg)
// 先解锁
lock.Unlock()
// 再发送通知
msgBox.recvCond.Signal()
}
}(max)
// 读操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max; j++ {
time.Sleep(time.Millisecond * 500)
lock.RLock()
for useCond && msgBox.isEmpty {
msgBox.recvCond.Wait()
}
msgBox.isEmpty = true
msg := msgBox.message
fmt.Printf("接收消息[%d]: %s\n", j, msg)
lock.RUnlock()
msgBox.sendCond.Signal()
}
}(max)
<-done
<-done
fmt.Println("Over")
}
在这个例子里,写的时候要获取到写锁,读的时候要获取到读锁,这个逻辑和之前互斥锁是一样的。但是只是获取到锁还不能做操作,这里还要再做一个限制,所以就用到了条件变量。
在这个例子里,写操作和读操作是需要成对出现的。写完一次之后,依然能获取到写锁,但是不能立刻写。而是要等待读操作把之前写入的数据读过之后,才能再次写入,把之前的内容覆盖掉。读操作也是一样。这里就需要两个goroutine之间传递信号了。
通过命令行参数分别在开启/关闭条件变量的环境下运行,可以看到其中的作用:
go run main.go go run main.go -cond
Wait方法
条件变量的Wait方法主要做了4件事:
- 把调用它的goroutine加入到当前条件变量的通知队列中
- 解锁当前的条件变量基于的那个互斥锁
- 让当前的goroutine处于等待状态,等到通知来了再决定是否唤醒它。此时阻塞在调用Wait方法的那行代码上
- 如果通知来了并且决定唤醒当前goroutine,就在唤醒它之后重新锁定当前条件变量基于的互斥锁
先解锁,在阻塞在Wait方法里,必须要先解锁,在阻塞当前goroutine。否则就违背了互斥锁要成对出现的原则。并且当前goroutine在解锁千就阻塞的话,当前goroutine就不可能在执行解锁了。即使不考虑原则,让别的goroutine来解锁,又会有重复解锁可能。
使用for语句并且Wait方法建议是放在一个for循环里的。这里似乎也是可以用if语句的。但是if语句只能检查状态一次,而for的话可以进行多次检查。如果goroutine收到了通知而唤醒,但是此时检查时发现状态还是不对,那么就应该再次调用Wait方法。保险起见,在包裹条件变量的Wait方法总是应该使用for语句。
Signal方法和Broadcast方法
这2个方法都是用来发送通知的。Signal方法的通知只会唤醒一个goroutine,而Broadcast方法的通知会唤醒所有等待的goroutine。Wait方法会把当前的goroutine添加到通知队列的队尾,而Signal方法会从通知队列的队首开始查找可以被唤醒的goroutine。因此Signal方法唤醒的一般是最早等待的那个goroutine。
适用场景这2个方法的行为决定他们的适用场景。确定只有一个goroutine在等待通知,或者值需要唤醒一个goroutine的时候,就使用Signal方法。否则,使用Broadcast方法总是没错的,Broadcast方法的适用场景更多。
通知的即时性
条件变量的通知具有即时性。如果发送通知的时候没有goroutine在等待,那么该次通知就会被直接丢弃。之后再开始等待的goroutine需要等待之后的通知。
示例代码2
还是前面那个示例,稍微改了改,把读写锁换成了互斥锁,通知方法把Signal换成了Broadcast:
package main
import (
"fmt"
"sync"
"time"
)
var lock sync.Mutex
// 匿名结构体,定义并初始化赋值
// 嵌入式锁(Embedded lock)的场景适合使用匿名结构体
var msgBox = struct {
message string
isEmpty bool
sendCond *sync.Cond
recvCond *sync.Cond
}{
isEmpty: true,
sendCond: sync.NewCond(&lock),
recvCond: sync.NewCond(&lock),
}
// 用于设置消息的函数
func send(id, index int) {
lock.Lock()
for !msgBox.isEmpty {
msgBox.sendCond.Wait()
}
msg := fmt.Sprintf("msg: [%d-%d]", id, index)
msgBox.message = msg
fmt.Printf("发送消息[%d-%d]: %s\t", id, index, msg)
msgBox.isEmpty = false
lock.Unlock()
msgBox.recvCond.Broadcast()
}
// 用于读取消息的函数
func recv(id, index int) {
lock.Lock()
for msgBox.isEmpty {
msgBox.recvCond.Wait()
}
msg := msgBox.message
msgBox.message = ""
fmt.Printf("接收消息[%d-%d]: %s\n", id, index, msg)
msgBox.isEmpty = true
lock.Unlock()
msgBox.sendCond.Broadcast()
}
func main() {
done := make(chan struct{})
count := 5
// 启动一个goroutine用于发送
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 100)
send(id, i)
}
}(0, count * 2)
// 启动两个goroutine用于接收
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 300)
recv(id, i)
}
}(1, count)
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 400)
recv(id, i)
}
}(2, count)
<- done
<- done
<- done
fmt.Println("Over")
}
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- C++多线程中的锁和条件变量使用
- C++11多线程-条件变量(std::condition_variable)
- SQL where条件和jion on条件的详解及区别
- Python 条件语句
- React 行内条件渲染
- Golang: 条件和循环
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Paradigms of Artificial Intelligence Programming
Peter Norvig / Morgan Kaufmann / 1991-10-01 / USD 77.95
Paradigms of AI Programming is the first text to teach advanced Common Lisp techniques in the context of building major AI systems. By reconstructing authentic, complex AI programs using state-of-the-......一起来看看 《Paradigms of Artificial Intelligence Programming》 这本书的介绍吧!