go语言-常见并发模式

栏目: IT技术 · 发布时间: 4年前

内容简介:我们开启了两个Producer生产流水线,分别用于生成3 和 5的倍数的序列。两个生产者之间无同步事件可参考,它们是并发的。因此,消费者输出的结果序列的顺序是不确定的,这并没有问题,生产者和消费者依然可以相互配合工作。2.发布/订阅模型发布/订阅(publish-subscribe)模型通常被简写为pub/sub模型。在这个模型中,消息生产者成为发布者(publisher),而消息消费者则成为订阅者(subscriber),生产者和消费者是M:N的关系。在传统生产者和消费者模型中,是将消息发送到一个队列中,
  1. 生产者-消费者模型

    并发编程中最常见的例子就是生产者/消费者模型,该模型主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单的说,就是生产者生产一些数据,然后放到队列中,同时消费者从队列中来取这些数据。这样就让生产和消费变成了异步的两个过程。当队列中没有数据是,消费者就进入饥饿的等待中;而当对立中数据已满时,生产者则面临产品积压导致CPU被剥夺的问题。

//生产者
func Producer(factor int, out chan <- int) {
    for i := 0; ; i++ {
        out <- i * factor
    }
}

//消费者
func Consumer(in <- chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    ch := make(chan int, 64) //队列
    go Producer(3, ch) //生成3的倍数的序列
    go Producer(5, ch) //生成5的倍数的序列
    go Consumer(ch)    //消费生成的队列
    
    // Ctrl + C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("qiut (%v)\n", <-sig)
}

我们开启了两个Producer生产流水线,分别用于生成3 和 5的倍数的序列。两个生产者之间无同步事件可参考,它们是并发的。因此,消费者输出的结果序列的顺序是不确定的,这并没有问题,生产者和消费者依然可以相互配合工作。

2.发布/订阅模型

发布/订阅(publish-subscribe)模型通常被简写为pub/sub模型。在这个模型中,消息生产者成为发布者(publisher),而消息消费者则成为订阅者(subscriber),生产者和消费者是M:N的关系。在传统生产者和消费者模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题。

为此,我们构建了一个名为pubsub的发布/订阅模型支持包:

//Package pubsub implements a simple multi-topic pub-sub library
package pubsub

import (
   "sync"
   "time"
)

type (
   subscriber chan interface{}         //订阅者为一个通道
   topicFunc  func(v interface{}) bool //订阅者为一个过滤器
)

type Publisher struct {
   m           sync.RWMutex             //读写锁
   buffer      int                      //订阅队列的缓存大小
   timeout     time.Duration            //发布超时时间
   subscribers map[subscriber]topicFunc //订阅者信息
}

// 构建一个发布者对象,可以设置发布超时时间和缓存队列的长度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
   return &Publisher{
      buffer:      buffer,
      timeout:     publishTimeout,
      subscribers: make(map[subscriber]topicFunc),
   }
}

// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
   return p.SubscribeTopic(nil)
}

// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
   ch := make(chan interface{}, p.buffer)
   p.m.Lock()
   p.subscribers[ch] = topic
   p.m.Unlock()
   return ch
}

// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
   p.m.Lock()
   defer p.m.Unlock()
   delete(p.subscribers, sub)
   close(sub)
}

// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
   p.m.RLock()
   defer p.m.Unlock()

   var wg sync.WaitGroup
   for sub, topic := range p.subscribers {
      wg.Add(1)
      go p.sendTopic(sub, topic, v, &wg)
   }
   wg.Wait()
}

// 关闭发布者对象,同时关闭所有的订阅者通道
func (p *Publisher) Close() {
   p.m.Lock()
   defer p.m.Unlock()

   for sub := range p.subscribers {
      delete(p.subscribers, sub)
      close(sub)
   }
}

// 发布主题,可以容忍一定的超时
func (p *Publisher) sendTopic(
   sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
   defer wg.Done()
   if topic != nil && !topic(v) {
      return
   }

   select {
      case sub <- v:
      case <-time.After(p.timeout):
   }
}

下面的例子中,有两个订阅者分别订阅了全部主题和含有"golang"的主题:

import "path/to/pubsub"

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 10)
    defer p.Close()
    
    all := p.Subscribe()
    golang := p.SubscribeTopic(func(v interface{}) bool {
        if s, ok := v.(string); ok {
            return string.Contains(s, "golane")
        }
        return false
    }
})

p.Publish("hello, world!")
p.Publish("hello, golang!")

go func() {
    for msg := range all {
        fmt.Println("all:", msg)
    }
}()

go func() {
    for msg := range golang {
        fmt.Println("golang:", msg)
    }
}()

//运行一定时间后退出
time.Sleep(3 * time.Second)

在发布/订阅模型中,每条消息都会传送给多个订阅者。发布者通常不会知道,也不关心哪一个订阅者正在接收主题消息。订阅者和发布者可以在运行时动态添加,它们之间是一种松散的耦合关系,这使得系统的复杂性可以随着时间的推移而增长。在现实生活中,像天气预报之类的应用就可以应用这种并发模式。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

增长黑客

增长黑客

Sean Ellis / 张溪梦 / 中信出版集团股份有限公司 / 2017-11-1 / CNY 59.00

脸谱网如何从默默无闻到坐拥二十几亿用户? 爱彼迎、优步何以在短短时间估值超过百亿美元? 领英怎样跨步成为全球领先的职业社交平台? 这些初创公司实现爆发式成长的共同奥秘就是增长黑客。 增长黑客是硅谷当下热门的新商业方法论,其精髓在于通过快节奏测试和迭代,以极低甚至零成本获取并留存用户。 作为最早提出“增长黑客”概念的理论先驱、带领Dropbox实现500%增长的实战领军......一起来看看 《增长黑客》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

URL 编码/解码
URL 编码/解码

URL 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试