Go语言基础(五)—— 并发编程

栏目: IT技术 · 发布时间: 4年前

前言:

647

目录如下:

Go语言基础(一)—— 简介、环境配置、HelloWorld

Go语言基础(二)—— 基本常用语法

Go语言基础(三)—— 面向对象编程

Go语言基础(四)—— 优质的容错处理

Go语言基础(五)—— 并发编程

Go语言基础(六)—— 测试、反射、Unsafe

Go语言基础(七)—— 架构 & 常见任务

Go语言基础(八)—— 性能调优

本篇将介绍如下内容:

1.协程机制( Groutine

2.共享内存并发机制(协程安全)

3.CSP并发机制( channel

4.多路选择和超时控制( select

5.channel的关闭和广播( channel

6.任务的取消

7.Context与关联任务取消

8.常见并发任务(实战)

一、协程机制

相信大家肯定都知道 “线程”“进程” 的概念。

而在 Go 语言中,“协程”可以理解为更轻量级的线程。 通过调度“协程”就可以把系统Kernel的效率发挥到极致。

通过一张表格,我们来对比一下协程与线程的区别。

  • Thread vs. Groutine:
\ 默认栈大小(创建时) KSE对应关系(Kernel Space Entity)
线程 Thread 1M 1 : 1
协程 Groutine 2K M : N
Go语言基础(五)—— 并发编程

协程vs.线程的优势在于:

  • 线程之间的切换会牵扯到内核中系统线程( kernel entity )的切换,这会造成较大的成本。
  • 而多个协程在同一个系统线程( kernel entity )下切换,就能降低切换系统线程( kernel entity )的成本。(如上图所示)

协程的使用:

语法: go + func

func TestGroutine(t *testing.T) {
	for i := 0; i < 10; i++ {
		go func(i int) {
			fmt.Println(i) // 正确案例,值传递。各个协程无竞争关系。
		}(i)

		// go func() {
		// 	fmt.Println(i) // 错误案例,共享变量。各个协程有竞争关系
		// }()
	}
	time.Sleep(time.Millisecond * 50)
}
复制代码

二、共享内存并发机制(协程安全)

说到协程安全,我们第一个会想到的就是加锁(lock)。 通过加锁来保证协程安全。

在Go语言中也是如此,我们来看个例子。

  • 协程并发,导致的协程不安全:
// 协程不安全demo
func TestCounterThreadUnsafe(t *testing.T) {
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			counter++
		}()
	}
	time.Sleep(1 * time.Second)
	t.Logf("counter = %d", counter)
}
复制代码

结果如下:

=== RUN   TestCounterThreadUnsafe
--- PASS: TestCounterThreadUnsafe (1.00s)
    share_mem_test.go:18: counter = 4765
复制代码

这时就会发现,计算错误,因为并发导致了漏值。

  • 解决方式一: 普通加锁,并加延迟等待协程执行完毕(不推荐)
// 协程等待demo(停1秒,不推荐)
func TestCounterThreadSafe(t *testing.T) {
	var mut sync.Mutex
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			defer func() {
				mut.Unlock() //函数调用完成后:解锁,保证协程安全
			}()
			mut.Lock() // 函数将要调用前:加锁,保证协程安全
			counter++
		}()
	}
	time.Sleep(1 * time.Second) // 等待一秒,等协程全部执行完
	t.Logf("counter = %d", counter)
}
复制代码

结果如下:

=== RUN   TestCounterThreadSafe
--- PASS: TestCounterThreadSafe (1.01s)
    share_mem_test.go:35: counter = 5000
复制代码

结果正确,但是有一个问题。因为这里有个1秒的延迟等待,保证协程运行完毕再调用结果。因此,有没有更好的处理方式呢?接下来我们再优化一下。

  • 解决方式二: 推荐! 使用同步等待队列( WaitGroup )保证顺序执行。
// 协程安全Demo
func TestCounterWaitGroup(t *testing.T) {
	var mut sync.Mutex    // 互斥锁
	var wg sync.WaitGroup // 等待队列
	counter := 0
	for i := 0; i < 5000; i++ {
		wg.Add(1) // 加个任务
		go func() {
			defer func() {
				mut.Unlock() //函数调用完成后:解锁,保证协程安全
			}()
			mut.Lock() // 函数将要调用前:加锁,保证协程安全
			counter++
			wg.Done() // 做完任务
		}()
	}
	wg.Wait() //等待所有任务执行完毕
	t.Logf("counter = %d", counter)
}
复制代码

运行结果如下:

=== RUN   TestCounterWaitGroup
--- PASS: TestCounterWaitGroup (0.00s)
    share_mem_test.go:55: counter = 5000
复制代码

这样的话,可以看出:互斥锁 Mutex 和等待队列 WaitGroup 不仅保证了协程的安全,还避免了提前打印结果。(:heavy_check_mark:)

三、CSP并发机制

1. CSP

CSP( Communicating sequential processes ):通信顺序进程(管道通信)。 简单来说,CSP是通过 Channel (管道)来通信的。

Go 中的 Channel (管道)有容量限制并且独立于处理 Groutine (协程)。

2. Channel

Go中常见的 Channel 有两种,分别对应为 ChannelBuffer Channel

  • 第一种:Channel(无缓冲)
Go语言基础(五)—— 并发编程

首先,发送者与接受者必须同时站在 Channel 的两端才进行交互。 如果一方不在,另一方就会阻塞在一端,直到两端都在才进行交互。

创建语法: make(chan [type])

retChannel := make(chan string) // 创建无缓冲channel,并指明channel中的数据为string,双端等待
复制代码

输入语法: channel <-

channel <- object // channel输入
复制代码

获取语法: <- channel

object <- channel // channel输出
复制代码
  • 第二种:Buffer Channel(有缓冲)
Go语言基础(五)—— 并发编程

这是一种稍微高级一点的 Channel 方式,(更加松耦合)。

首先,给 Channel 设置一个容量大小,并且不要求发送者与接受者同时站在两端。 然后,发送者会以 Buffer 的形式,不断往 Channel 里发送消息。 直到 Channel 的容量满了才阻塞。 这时,只要接受方接收了消息(即 Channel 有剩余容量了),发送者就会继续发送消息。

创建语法: make(chan [type], Int)

retChannel := make(chan string, 1) // 创建有缓冲channel,并指明channel中的数据为string
复制代码

输入语法: channel <-

channel <- object // channel输入
复制代码

获取语法: <- channel

object <- channel // channel输出
复制代码

Demo:模拟了一个网络请求的方法调用过程,通过 Channel 来控制当前协程在网络请求的等待过程中,去执行别的任务。

// 模拟网络请求
func serviceTask() string {
	fmt.Println("- start working on service task.")
	time.Sleep(time.Millisecond * 50)
	return "- service task is Done."
}

// 别的任务
func otherTask() {
	fmt.Println("start working on something else.")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("other task is Done.")
}

// csp异步管道
func AsyncService() chan string {
	retChannel := make(chan string) // 无缓冲channel,创建并指明channel中的数据为string,双端等待
	// retChannel := make(chan string, 1) // 有缓冲channel,创建并指明channel中的数据为string
	go func() {
		ret := serviceTask()
		fmt.Println("returned result.")
		retChannel <- ret // channel输入
		fmt.Println("service exited.")
	}()
	return retChannel
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh) // channel输出
	time.Sleep(time.Second * 1)
}
复制代码

四、多路选择和超时控制

使用 select 关键字,完成“多路选择”与“超时控制”。

  • 多路选择: 当返回的 channel 可能有多个时,可以使用select来处理多路的响应事件。

注意:这里与 switch 有点像,但是要注意的是,它并不是顺序判断的。也就是如果 channel1channel2 同时满足时,可能走的是 channel1 、也可能是 channel2 ,并不像 switch 一样做顺序的判断。

Demo:

select {
	case ret := <-channel1: 
		t.Log(ret)
	case ret:= <- channel2:
		t.Log(ret)
	case default:
		t.Error("No one returned.")
	}
复制代码
  • 超时控制:

同时,我们也可以设置一个超时等待的一个分路,当 channel 超时还未返回时,可以执行相应的代码。

Demo:

select {
	case ret := <-AsyncService(): //正常返回
		t.Log(ret)
	case <-time.After(time.Millisecond * 100): // 超时等待
		t.Error("time out")
	}
复制代码

五、channel的关闭和广播

要点如下:

  1. 向已经 closechannel 发消息,会导致程序 panic
  2. v, ok <- channel 。 其中, okbool 值, 若 ok==true 时,表示 channel 处于 open 状态。 若 ok==false 时,表示 channel 处于 close 状态。
  3. 所有 channel 接收者在 channel 关闭时,都会立刻从阻塞等待中返回,且 ok 值为 false 。(PS:广播机制,通常被利用向多个订阅者同时发送信号。如,退出信号。)

Demo:

// 消息生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}

		fmt.Println("channel close.")
		close(ch) // 关闭channel

		wg.Done()
	}()
}

// 消息接收者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok { // 有消息就打印,直到channel被close。
				fmt.Println(data)
			} else {
				fmt.Println("Receiver close.")
				break // channel被close
			}
		}
		wg.Done()
	}()
}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg) // 开启生产者
	wg.Add(1)
	dataReceiver(ch, &wg) // 开启消费者
	wg.Wait()
}
复制代码

六、任务的取消

通过上面的 close channel (广播机制),我们可以延伸一下,通过 close channel 通知所有 channel 取消当前的任务。

Demo如下:

func isCancelled(cancelChan chan struct{}) bool {
	select {
	case <-cancelChan:
		return true
	default:
		return false
	}
}

// 只能取消单个channel
func cancel_1(cancelChan chan struct{}) {
	cancelChan <- struct{}{}
}

// 所有channel全部取消
func cancel_2(cancelChan chan struct{}) {
	close(cancelChan)
}

func TestCancel(t *testing.T) {
	cancelChan := make(chan struct{}, 0) // 创建了一个channal,通过它来控制事件取消
	for i := 0; i < 5; i++ {             // 开启5个协程
		go func(i int, chanclCh chan struct{}) { // 每个协程里面都有一个死循环,去等待取消消息
			for {
				if isCancelled(cancelChan) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
			}
			fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
		}(i, cancelChan)
	}
	cancel_2(cancelChan) // 通知所有channel关闭。
	time.Sleep(time.Second * 1)
}
复制代码

七、Context与关联任务取消

刚才我们通过 close channel 来取消任务,但会有些问题。 比如,当一个任务被取消后,它所关联的子任务也应该被立即取消。

为了解决这个问题, go 1.9.0 之后, golang 加入了 context ,来保证关联任务的取消。

1. Context

context 就是用于管理相关任务的上下文,包含了共享值的传递,超时,取消通知。

结构体如下:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
复制代码
  1. Deadline 会返回一个超时时间, Goroutine 获得了超时时间后,例如可以对某些io操作设定超时时间。
  2. Done 方法返回一个信道( channel ),当 Context 被撤销或过期时,该信道是关闭的,即它是一个表示 Context 是否已关闭的信号。
  3. Done 信道关闭后,Err方法表明Context被撤的原因。
  4. Value 可以让 Goroutine 共享一些数据,当然获得数据是协程安全的。但使用这些数据的时候要注意同步,比如返回了一个 map ,而这个 map 的读写则要加锁。

要点:

context.Background()
context.WithCancel(parentContext)
<-ctx.Done

2. 关联任务取消

我们把刚才的例子稍加调整,通过context来取消所有关联的任务。

  • 首先,创建一个 context
ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
复制代码
  • 编写一个取消方法,把 context 作为参数。
func isCancelled(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false
    }
}
复制代码
  • 开五个协程死循环,每个协程里面都有一个死循环,等待取消任务消息。再调用 cancel 方法。
for i := 0; i < 5; i++ {                                // 开启5个协程
        go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
            for {
                if isCancelled(ctx) {
                    break
                }
                time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
            }
            fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
        }(i, ctx)
    }
    cancel() // 取消ctx
复制代码

完整示例代码如下:

func isCancelled(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

func TestCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background()) // 创建一个子context
	for i := 0; i < 5; i++ {                                // 开启5个协程
		go func(i int, ctx context.Context) { // 每个协程里面都有一个死循环,去等待取消消息
			for {
				if isCancelled(ctx) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模拟延迟5毫秒
			}
			fmt.Println(i, "Cancelled") // 说明退出了死循环,打印日志
		}(i, ctx)
	}
	cancel() // 取消ctx
	time.Sleep(time.Second * 1)
}
复制代码

八、常见并发任务(实战)

1. 只执行一次(单例模式)

场景:在多协程的情况下,保证某段代码只执行一次。

type Singleton struct {
	data string
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
	once.Do(func() {
		fmt.Println("Create Obj")
		singleInstance = new(Singleton)
	})
	return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			obj := GetSingletonObj()
			fmt.Printf("%p\n", obj)
			wg.Done()
		}()
	}
	wg.Wait()
}
复制代码

2. 仅需任意任务完成

利用channel管道通信的机制,我们可以再任何一个协程完成任务时,就给对象发消息。

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func firstResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 创建bufferChannel。(如果用channel会导致协程泄漏,剩下9个channel会一直阻塞在系统中。)
	for i := 0; i < numOfRunner; i++ { // 开了10个协程
		go func(i int) {
			ret := runTask(i) // 每个协程去执行任务
			ch <- ret
		}(i)
	}
	return <-ch // 返回channel里的第一个Response。(因为channel是一个先进先出的管道)
}

func TestFirstResponse(t *testing.T) {
	t.Log(firstResponse()) // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
}
复制代码

3. 所有任务完成

刚才,我们介绍了first response,接下来我们看一下all response该怎么做。思路是一样的,只要接收到所有 channel 返回的数据,再返回即可。

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func allResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 创建bufferChannel。
	for i := 0; i < numOfRunner; i++ {   // 开了10个协程
		go func(i int) {
			ret := runTask(i) // 每个协程去执行任务
			ch <- ret
		}(i)
	}
	finalRet := ""
	for j := 0; j < numOfRunner; j++ {
		finalRet += <-ch + "\n"
	}
	return finalRet // 返回channel里的所有的Response。(因为channel是一个先进先出的管道)
}

func TestAllResponse(t *testing.T) {
	t.Log("Before:", runtime.NumGoroutine()) // 打印一下当前的协程数量
	t.Log(allResponse())                     // 发现每次运行返回的都不一样,会根据协程完成任务的一个顺序返回。
	t.Log("After:", runtime.NumGoroutine()) // 再打印一下当前的协程数量
}
复制代码

4. 对象池

我们可以用buffer channel的管道特性来做一个对象池。

Go语言基础(五)—— 并发编程

Demo:

type ReusableObj struct {
}

type ObjPool struct {
	bufChan chan *ReusableObj // 用于缓冲可重用对象
}

// 生产指定数量对象的对象池
func NewObjPool(numOfObj int) *ObjPool {
	ObjPool := ObjPool{}
	ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
	for i := 0; i < numOfObj; i++ {
		ObjPool.bufChan <- &ReusableObj{}
	}
	return &ObjPool
}

// 从对象池中获得对象
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
	select {
	case ret := <-p.bufChan:
		return ret, nil
	case <-time.After(timeout): // 超时控制
		return nil, errors.New("time out")
	}
}

// 释放对象池里的对象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
	select {
	case p.bufChan <- obj:
		return nil
	default:
		return errors.New("overflow")
	}
}

func TestObjPool(t *testing.T) {
	pool := NewObjPool(10) // 生产一个10容量大小的对象池
	for i := 0; i < 10; i++ {
		if v, err := pool.GetObj(time.Second * 1); err != nil { // 获取obj
			t.Error(err)
		} else {
			fmt.Printf("%T\n", v)                      // 获取成功,答应日志。
			if err := pool.ReleaseObj(v); err != nil { // 释放obj
				t.Error(err)
			}
		}
	}
	fmt.Println("Done.")
}
复制代码

5. sync.pool对象缓存

我们可以通过sync.pool做对象缓存(创建、获取、缓存的策略)。

对象获取策略:

  1. 首先,尝试从私有对象获取。

  2. 其次,如果私有对象不存在,就尝试从当前 Process 的共享池获取。

  3. 如果当前 Process 的共享池是空的,就尝试从其他 Process 的共享池获取。

  4. 如果所有 Process 的共享池都是空的,就从 sync.pool 指定的 New 方法中 “New” 一个新的对象返回。

sync.pool缓存对象的生命周期:

  • 每一次 GC (垃圾回收)都会清除sync.pool的缓存对象。

  • 因此,对象缓存的有效期为下一次 GC 之前。

基本使用:

func TestSyncPool(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} { // 创建一个新的对象
			fmt.Println("Create a new object.")
			return 100
		},
	}

	v := pool.Get().(int) // 获取对象
	fmt.Println(v)
	pool.Put(3) // 放回对象
	// runtime.GC() // 触发GC,会清除sync.pool中缓存的对象
	v1, _ := pool.Get().(int)
	fmt.Println(v1)
}
复制代码

多协程下的使用:

func TestSyncPoolInMultiGroutine(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 10
		},
	}

	pool.Put(100)
	pool.Put(100)
	pool.Put(100)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {// 创建10个协程
		wg.Add(1) 
		go func(id int) {
			fmt.Println(pool.Get()) // 获取对象
			wg.Done() 
		}(i)
	}
	wg.Wait()
}
复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Redis开发与运维

Redis开发与运维

付磊、张益军 / 机械工业出版社 / 2017-3-1 / 89.00

本书全面讲解Redis基本功能及其应用,并结合线上开发与运维监控中的实际使用案例,深入分析并总结了实际开发运维中遇到的“陷阱”,以及背后的原因, 包含大规模集群开发与管理的场景、应用案例与开发技巧,为高效开发运维提供了大量实际经验和建议。本书不要求读者有任何Redis使用经验,对入门与进阶DevOps的开发者提供有价值的帮助。主要内容包括:Redis的安装配置、API、各种高效功能、客户端、持久化......一起来看看 《Redis开发与运维》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换