前言:
647目录如下:
Go语言基础(一)—— 简介、环境配置、HelloWorld
Go语言基础(六)—— 测试、反射、Unsafe
Go语言基础(七)—— 架构 & 常见任务
Go语言基础(八)—— 性能调优
本篇将介绍如下内容:
1.协程机制( Groutine
)
2.共享内存并发机制(协程安全)
3.CSP并发机制( channel
)
4.多路选择和超时控制( select
)
5.channel的关闭和广播( channel
)
6.任务的取消
7.Context与关联任务取消
8.常见并发任务(实战)
一、协程机制
相信大家肯定都知道 “线程” 与 “进程” 的概念。
而在 Go 语言中,“协程”可以理解为更轻量级的线程。 通过调度“协程”就可以把系统Kernel的效率发挥到极致。
通过一张表格,我们来对比一下协程与线程的区别。
- Thread vs. Groutine:
\ | 默认栈大小(创建时) | KSE对应关系(Kernel Space Entity) |
---|---|---|
线程 Thread | 1M | 1 : 1 |
协程 Groutine | 2K | M : N |
协程vs.线程的优势在于:
- 线程之间的切换会牵扯到内核中系统线程(
kernel entity
)的切换,这会造成较大的成本。 - 而多个协程在同一个系统线程(
kernel entity
)下切换,就能降低切换系统线程(kernel entity
)的成本。(如上图所示)
协程的使用:
语法: go + func
func TestGroutine(t *testing.T) { for i := 0; i < 10; i++ { go func(i int) { fmt.Println(i) // 正确案例,值传递。各个协程无竞争关系。 }(i) // go func() { // fmt.Println(i) // 错误案例,共享变量。各个协程有竞争关系 // }() } time.Sleep(time.Millisecond * 50) } 复制代码
二、共享内存并发机制(协程安全)
说到协程安全,我们第一个会想到的就是加锁(lock)。 通过加锁来保证协程安全。
在Go语言中也是如此,我们来看个例子。
- 协程并发,导致的协程不安全:
// 协程不安全demo func TestCounterThreadUnsafe(t *testing.T) { counter := 0 for i := 0; i < 5000; i++ { go func() { counter++ }() } time.Sleep(1 * time.Second) t.Logf("counter = %d", counter) } 复制代码
结果如下:
=== RUN TestCounterThreadUnsafe --- PASS: TestCounterThreadUnsafe (1.00s) share_mem_test.go:18: counter = 4765 复制代码
这时就会发现,计算错误,因为并发导致了漏值。
- 解决方式一: 普通加锁,并加延迟等待协程执行完毕(不推荐)
// 协程等待demo(停1秒,不推荐) func TestCounterThreadSafe(t *testing.T) { var mut sync.Mutex counter := 0 for i := 0; i < 5000; i++ { go func() { defer func() { mut.Unlock() //函数调用完成后:解锁,保证协程安全 }() mut.Lock() // 函数将要调用前:加锁,保证协程安全 counter++ }() } time.Sleep(1 * time.Second) // 等待一秒,等协程全部执行完 t.Logf("counter = %d", counter) } 复制代码
结果如下:
=== RUN TestCounterThreadSafe --- PASS: TestCounterThreadSafe (1.01s) share_mem_test.go:35: counter = 5000 复制代码
结果正确,但是有一个问题。因为这里有个1秒的延迟等待,保证协程运行完毕再调用结果。因此,有没有更好的处理方式呢?接下来我们再优化一下。
- 解决方式二: 推荐! 使用同步等待队列(
WaitGroup
)保证顺序执行。
// 协程安全Demo func TestCounterWaitGroup(t *testing.T) { var mut sync.Mutex // 互斥锁 var wg sync.WaitGroup // 等待队列 counter := 0 for i := 0; i < 5000; i++ { wg.Add(1) // 加个任务 go func() { defer func() { mut.Unlock() //函数调用完成后:解锁,保证协程安全 }() mut.Lock() // 函数将要调用前:加锁,保证协程安全 counter++ wg.Done() // 做完任务 }() } wg.Wait() //等待所有任务执行完毕 t.Logf("counter = %d", counter) } 复制代码
运行结果如下:
=== RUN TestCounterWaitGroup --- PASS: TestCounterWaitGroup (0.00s) share_mem_test.go:55: counter = 5000 复制代码
这样的话,可以看出:互斥锁 Mutex
和等待队列 WaitGroup
不仅保证了协程的安全,还避免了提前打印结果。(:heavy_check_mark:)
三、CSP并发机制
1. CSP
CSP( Communicating sequential processes
):通信顺序进程(管道通信)。 简单来说,CSP是通过 Channel
(管道)来通信的。
Go 中的 Channel
(管道)有容量限制并且独立于处理 Groutine
(协程)。
2. Channel
Go中常见的 Channel
有两种,分别对应为 Channel
、 Buffer Channel
。
- 第一种:Channel(无缓冲)
首先,发送者与接受者必须同时站在 Channel
的两端才进行交互。 如果一方不在,另一方就会阻塞在一端,直到两端都在才进行交互。
创建语法: make(chan [type])
retChannel := make(chan string) // 创建无缓冲channel,并指明channel中的数据为string,双端等待 复制代码
输入语法: channel <-
channel <- object // channel输入 复制代码
获取语法: <- channel
object <- channel // channel输出 复制代码
- 第二种:Buffer Channel(有缓冲)
这是一种稍微高级一点的 Channel
方式,(更加松耦合)。
首先,给 Channel
设置一个容量大小,并且不要求发送者与接受者同时站在两端。 然后,发送者会以 Buffer
的形式,不断往 Channel
里发送消息。 直到 Channel
的容量满了才阻塞。 这时,只要接受方接收了消息(即 Channel
有剩余容量了),发送者就会继续发送消息。
创建语法: make(chan [type], Int)
retChannel := make(chan string, 1) // 创建有缓冲channel,并指明channel中的数据为string 复制代码
输入语法: channel <-
channel <- object // channel输入 复制代码
获取语法: <- channel
object <- channel // channel输出 复制代码
Demo:模拟了一个网络请求的方法调用过程,通过 Channel
来控制当前协程在网络请求的等待过程中,去执行别的任务。
// 模拟网络请求 func serviceTask() string { fmt.Println("- start working on service task.") time.Sleep(time.Millisecond * 50) return "- service task is Done." } // 别的任务 func otherTask() { fmt.Println("start working on something else.") time.Sleep(time.Millisecond * 100) fmt.Println("other task is Done.") } // csp异步管道 func AsyncService() chan string { retChannel := make(chan string) // 无缓冲channel,创建并指明channel中的数据为string,双端等待 // retChannel := make(chan string, 1) // 有缓冲channel,创建并指明channel中的数据为string go func() { ret := serviceTask() fmt.Println("returned result.") retChannel <- ret // channel输入 fmt.Println("service exited.") }() return retChannel } func TestAsyncService(t *testing.T) { retCh := AsyncService() otherTask() fmt.Println(<-retCh) // channel输出 time.Sleep(time.Second * 1) } 复制代码
四、多路选择和超时控制
使用 select
关键字,完成“多路选择”与“超时控制”。
- 多路选择: 当返回的
channel
可能有多个时,可以使用select来处理多路的响应事件。
注意:这里与 switch
有点像,但是要注意的是,它并不是顺序判断的。也就是如果 channel1
与 channel2
同时满足时,可能走的是 channel1
、也可能是 channel2
,并不像 switch
一样做顺序的判断。
Demo:
select { case ret := <-channel1: t.Log(ret) case ret:= <- channel2: t.Log(ret) case default: t.Error("No one returned.") } 复制代码
- 超时控制:
同时,我们也可以设置一个超时等待的一个分路,当 channel
超时还未返回时,可以执行相应的代码。
Demo:
select { case ret := <-AsyncService(): //正常返回 t.Log(ret) case <-time.After(time.Millisecond * 100): // 超时等待 t.Error("time out") } 复制代码
五、channel的关闭和广播
要点如下:
- 向已经
close
的channel
发消息,会导致程序panic
。 -
v, ok <- channel
。 其中,ok
为bool
值, 若ok==true
时,表示channel
处于open
状态。 若ok==false
时,表示channel
处于close
状态。 - 所有
channel
接收者在channel
关闭时,都会立刻从阻塞等待中返回,且ok
值为false
。(PS:广播机制,通常被利用向多个订阅者同时发送信号。如,退出信号。)
Demo:
// 消息生产者 func dataProducer(ch chan int, wg *sync.WaitGroup) { go func() { for i := 0; i < 10; i++ { ch <- i } fmt.Println("channel close.") close(ch) // 关闭channel wg.Done() }() } // 消息接收者 func dataReceiver(ch chan int, wg *sync.WaitGroup) { go func() { for { if data, ok := <-ch; ok { // 有消息就打印,直到channel被close。 fmt.Println(data) } else { fmt.Println("Receiver close.") break // channel被close } } wg.Done() }() } func TestCloseChannel(t *testing.T) { var wg sync.WaitGroup ch := make(chan int) wg.Add(1) dataProducer(ch, &wg) // 开启生产者 wg.Add(1) dataReceiver(ch, &wg) // 开启消费者 wg.Wait() } 复制代码
六、任务的取消
通过上面的 close channel
(广播机制),我们可以延伸一下,通过 close channel
通知所有 channel
取消当前的任务。
Demo如下:
func isCancelled(cancelChan chan struct{}) bool { select { case <-cancelChan: return true default: return false } } // 只能取消单个channel func cancel_1(cancelChan chan struct{}) { cancelChan <- struct{}{} } // 所有channel全部取消 func cancel_2(cancelChan chan struct{}) { close(cancelChan) } func TestCancel(t *testing.T) { cancelChan := make(chan struct{}, 0) // 创建了一个channal,通过它来控制事件取消 for i := 0; i < 5; i++ { // 开启5个协程 go func(i int, chanclCh chan struct{}) { // 每个协程里面都有一个死循环,去等待取消消息 for { if isCancelled(cancelChan) { break } time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒 } fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志 }(i, cancelChan) } cancel_2(cancelChan) // 通知所有channel关闭。 time.Sleep(time.Second * 1) } 复制代码
七、Context与关联任务取消
刚才我们通过 close channel
来取消任务,但会有些问题。 比如,当一个任务被取消后,它所关联的子任务也应该被立即取消。
为了解决这个问题, go 1.9.0
之后, golang
加入了 context
,来保证关联任务的取消。
1. Context
context
就是用于管理相关任务的上下文,包含了共享值的传递,超时,取消通知。
结构体如下:
type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} } 复制代码
-
Deadline
会返回一个超时时间,Goroutine
获得了超时时间后,例如可以对某些io操作设定超时时间。 -
Done
方法返回一个信道(channel
),当Context
被撤销或过期时,该信道是关闭的,即它是一个表示Context
是否已关闭的信号。 - 当
Done
信道关闭后,Err方法表明Context被撤的原因。 -
Value
可以让Goroutine
共享一些数据,当然获得数据是协程安全的。但使用这些数据的时候要注意同步,比如返回了一个map
,而这个map
的读写则要加锁。
要点:
context.Background() context.WithCancel(parentContext) <-ctx.Done
2. 关联任务取消
我们把刚才的例子稍加调整,通过context来取消所有关联的任务。
- 首先,创建一个
context
:
ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context 复制代码
- 编写一个取消方法,把
context
作为参数。
func isCancelled(ctx context.Context) bool { select { case <-ctx.Done(): return true default: return false } } 复制代码
- 开五个协程死循环,每个协程里面都有一个死循环,等待取消任务消息。再调用
cancel
方法。
for i := 0; i < 5; i++ { // 开启5个协程 go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息 for { if isCancelled(ctx) { break } time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒 } fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志 }(i, ctx) } cancel() // 取消ctx 复制代码
完整示例代码如下:
func isCancelled(ctx context.Context) bool { select { case <-ctx.Done(): return true default: return false } } func TestCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context for i := 0; i < 5; i++ { // 开启5个协程 go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息 for { if isCancelled(ctx) { break } time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒 } fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志 }(i, ctx) } cancel() // 取消ctx time.Sleep(time.Second * 1) } 复制代码
八、常见并发任务(实战)
1. 只执行一次(单例模式)
场景:在多协程的情况下,保证某段代码只执行一次。
type Singleton struct { data string } var singleInstance *Singleton var once sync.Once func GetSingletonObj() *Singleton { once.Do(func() { fmt.Println("Create Obj") singleInstance = new(Singleton) }) return singleInstance } func TestGetSingletonObj(t *testing.T) { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { obj := GetSingletonObj() fmt.Printf("%p\n", obj) wg.Done() }() } wg.Wait() } 复制代码
2. 仅需任意任务完成
利用channel管道通信的机制,我们可以再任何一个协程完成任务时,就给对象发消息。
func runTask(id int) string { time.Sleep(10 * time.Millisecond) return fmt.Sprintf("The result is from %d", id) } func firstResponse() string { numOfRunner := 10 ch := make(chan string, numOfRunner) // 创建bufferChannel。(如果用channel会导致协程泄漏,剩下9个channel会一直阻塞在系统中。) for i := 0; i < numOfRunner; i++ { // 开了10个协程 go func(i int) { ret := runTask(i) // 每个协程去执行任务 ch <- ret }(i) } return <-ch // 返回channel里的第一个Response。(因为channel是一个先进先出的管道) } func TestFirstResponse(t *testing.T) { t.Log(firstResponse()) // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。 } 复制代码
3. 所有任务完成
刚才,我们介绍了first response,接下来我们看一下all response该怎么做。思路是一样的,只要接收到所有 channel
返回的数据,再返回即可。
func runTask(id int) string { time.Sleep(10 * time.Millisecond) return fmt.Sprintf("The result is from %d", id) } func allResponse() string { numOfRunner := 10 ch := make(chan string, numOfRunner) // 创建bufferChannel。 for i := 0; i < numOfRunner; i++ { // 开了10个协程 go func(i int) { ret := runTask(i) // 每个协程去执行任务 ch <- ret }(i) } finalRet := "" for j := 0; j < numOfRunner; j++ { finalRet += <-ch + "\n" } return finalRet // 返回channel里的所有的Response。(因为channel是一个先进先出的管道) } func TestAllResponse(t *testing.T) { t.Log("Before:", runtime.NumGoroutine()) // 打印一下当前的协程数量 t.Log(allResponse()) // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。 t.Log("After:", runtime.NumGoroutine()) // 再打印一下当前的协程数量 } 复制代码
4. 对象池
我们可以用buffer channel的管道特性来做一个对象池。
Demo:
type ReusableObj struct { } type ObjPool struct { bufChan chan *ReusableObj // 用于缓冲可重用对象 } // 生产指定数量对象的对象池 func NewObjPool(numOfObj int) *ObjPool { ObjPool := ObjPool{} ObjPool.bufChan = make(chan *ReusableObj, numOfObj) for i := 0; i < numOfObj; i++ { ObjPool.bufChan <- &ReusableObj{} } return &ObjPool } // 从对象池中获得对象 func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) { select { case ret := <-p.bufChan: return ret, nil case <-time.After(timeout): // 超时控制 return nil, errors.New("time out") } } // 释放对象池里的对象 func (p *ObjPool) ReleaseObj(obj *ReusableObj) error { select { case p.bufChan <- obj: return nil default: return errors.New("overflow") } } func TestObjPool(t *testing.T) { pool := NewObjPool(10) // 生产一个10容量大小的对象池 for i := 0; i < 10; i++ { if v, err := pool.GetObj(time.Second * 1); err != nil { // 获取obj t.Error(err) } else { fmt.Printf("%T\n", v) // 获取成功,答应日志。 if err := pool.ReleaseObj(v); err != nil { // 释放obj t.Error(err) } } } fmt.Println("Done.") } 复制代码
5. sync.pool对象缓存
我们可以通过sync.pool做对象缓存(创建、获取、缓存的策略)。
对象获取策略:
-
首先,尝试从私有对象获取。
-
其次,如果私有对象不存在,就尝试从当前
Process
的共享池获取。 -
如果当前
Process
的共享池是空的,就尝试从其他Process
的共享池获取。 -
如果所有
Process
的共享池都是空的,就从sync.pool
指定的New
方法中“New”
一个新的对象返回。
sync.pool缓存对象的生命周期:
-
每一次
GC
(垃圾回收)都会清除sync.pool的缓存对象。 -
因此,对象缓存的有效期为下一次
GC
之前。
基本使用:
func TestSyncPool(t *testing.T) { pool := &sync.Pool{ New: func() interface{} { // 创建一个新的对象 fmt.Println("Create a new object.") return 100 }, } v := pool.Get().(int) // 获取对象 fmt.Println(v) pool.Put(3) // 放回对象 // runtime.GC() // 触发GC,会清除sync.pool中缓存的对象 v1, _ := pool.Get().(int) fmt.Println(v1) } 复制代码
多协程下的使用:
func TestSyncPoolInMultiGroutine(t *testing.T) { pool := &sync.Pool{ New: func() interface{} { fmt.Println("Create a new object.") return 10 }, } pool.Put(100) pool.Put(100) pool.Put(100) var wg sync.WaitGroup for i := 0; i < 10; i++ {// 创建10个协程 wg.Add(1) go func(id int) { fmt.Println(pool.Get()) // 获取对象 wg.Done() }(i) } wg.Wait() } 复制代码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
图解网站分析(修订版)
[日] 小川卓 / 沈麟芸 / 人民邮电出版社 / 2014-10 / 69.00元
本书以图配文,结合实例详细讲解了如何利用从网站上获取的各种数据了解网站的运营状况,如何从数据中攫取最有用的信息,如何优化站点,创造更大的网站价值。本书适合各类网站运营人员阅读。 第1 部分介绍了进行网站分析必备的基础知识。第2 部分详细讲解了如何明确网站现状,发现并改善网站的问题。第3 部分是关于流量获取和网站内渠道优化的问题。第4 部分介绍了一些更加先进的网站分析方法,其中详细讲解了如何分......一起来看看 《图解网站分析(修订版)》 这本书的介绍吧!