golang常见的几种并发模型框架

栏目: Go · 发布时间: 5年前

内容简介:在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。

在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。

package main

import (
    "fmt"
    "math/rand"
    "os"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

type Scenario struct {
    Name        string
    Description []string
    Examples    []string
    RunExample  func()
}
var s1 = &Scenario{
    Name: "s1",
    Description: []string{
        "简单并发执行任务",
    },
    Examples: []string{
        "比如并发的请求后端某个接口",
    },
    RunExample: RunScenario1,
}

var s2 = &Scenario{
    Name: "s2",
    Description: []string{
        "持续一定时间的高并发模型",
    },
    Examples: []string{
        "在规定时间内,持续的高并发请求后端服务, 防止服务死循环",
    },
    RunExample: RunScenario2,
}

var s3 = &Scenario{
    Name: "s3",
    Description: []string{
        "基于大数据量的并发任务模型, goroutine worker pool",
    },
    Examples: []string{
        "比如技术支持要给某个客户删除几个TB/GB的文件",
    },
    RunExample: RunScenario3,
}

var s4 = &Scenario{
    Name: "s4",
    Description: []string{
        "等待异步任务执行结果(goroutine+select+channel)",
    },
    Examples: []string{
        "",
    },
    RunExample: RunScenario4,
}

var s5 = &Scenario{
    Name: "s5",
    Description: []string{
        "定时的反馈结果(Ticker)",
    },
    Examples: []string{
        "比如测试上传接口的性能,要实时给出指标: 吞吐率,IOPS,成功率等",
    },
    RunExample: RunScenario5,
}

var Scenarios []*Scenario

func init() {
    Scenarios = append(Scenarios, s1)
    Scenarios = append(Scenarios, s2)
    Scenarios = append(Scenarios, s3)
    Scenarios = append(Scenarios, s4)
    Scenarios = append(Scenarios, s5)
}

// 常用的并发与同步场景
func main() {
    if len(os.Args) == 1 {
        fmt.Println("请选择使用场景 ==> ")
        for _, sc := range Scenarios {
            fmt.Printf("场景: %s ,", sc.Name)
            printDescription(sc.Description)
        }
        return
    }
    for _, arg := range os.Args[1:] {
        sc := matchScenario(arg)
        if sc != nil {
            printDescription(sc.Description)
            printExamples(sc.Examples)
            sc.RunExample()
        }
    }
}

func printDescription(str []string) {
    fmt.Printf("场景描述: %s \n", str)
}

func printExamples(str []string) {
    fmt.Printf("场景举例: %s \n", str)
}

func matchScenario(name string) *Scenario {
    for _, sc := range Scenarios {
        if sc.Name == name {
            return sc
        }
    }
    return nil
}

var doSomething = func(i int) string {
    time.Sleep(time.Millisecond * time.Duration(10))
    fmt.Printf("Goroutine %d do things .... \n", i)
    return fmt.Sprintf("Goroutine %d", i)
}

var takeSomthing = func(res string) string {
    time.Sleep(time.Millisecond * time.Duration(10))
    tmp := fmt.Sprintf("Take result from %s.... \n", res)
    fmt.Println(tmp)
    return tmp
}

// 场景1: 简单并发任务

func RunScenario1() {
    count := 10
    var wg sync.WaitGroup

    for i := 0; i < count; i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            doSomething(index)
        }(i)
    }

    wg.Wait()
}

// 场景2: 按时间来持续并发

func RunScenario2() {
    timeout := time.Now().Add(time.Second * time.Duration(10))
    n := runtime.NumCPU()

    waitForAll := make(chan struct{})
    done := make(chan struct{})
    concurrentCount := make(chan struct{}, n)

    for i := 0; i < n; i++ {
        concurrentCount <- struct{}{}
    }

    go func() {
        for time.Now().Before(timeout) {
            <-done
            concurrentCount <- struct{}{}
        }

        waitForAll <- struct{}{}
    }()

    go func() {
        for {
            <-concurrentCount
            go func() {
                doSomething(rand.Intn(n))
                done <- struct{}{}
            }()
        }
    }()

    <-waitForAll
}

// 场景3:以 worker pool 方式 并发做事/发送请求

func RunScenario3() {
    numOfConcurrency := runtime.NumCPU()
    taskTool := 10
    jobs := make(chan int, taskTool)
    results := make(chan int, taskTool)
    var wg sync.WaitGroup

    // workExample
    workExampleFunc := func(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
        defer wg.Done()
        for job := range jobs {
            res := job * 2
            fmt.Printf("Worker %d do things, produce result %d \n", id, res)
            time.Sleep(time.Millisecond * time.Duration(100))
            results <- res
        }
    }

    for i := 0; i < numOfConcurrency; i++ {
        wg.Add(1)
        go workExampleFunc(i, jobs, results, &wg)
    }
    totalTasks := 100

    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < totalTasks; i++ {
            n := <-results
            fmt.Printf("Got results %d \n", n)
        }
        close(results)
    }()

    for i := 0; i < totalTasks; i++ {
        jobs <- i
    }
    close(jobs)
    wg.Wait()
}

// 场景4: 等待异步任务执行结果(goroutine+select+channel)

func RunScenario4() {
    sth := make(chan string)
    result := make(chan string)
    go func() {
        id := rand.Intn(100)
        for {
            sth <- doSomething(id)
        }
    }()
    go func() {
        for {
            result <- takeSomthing(<-sth)
        }
    }()

    select {
    case c := <-result:
        fmt.Printf("Got result %s ", c)
    case <-time.After(time.Duration(30 * time.Second)):
        fmt.Errorf("指定时间内都没有得到结果")
    }
}

var doUploadMock = func() bool {
    time.Sleep(time.Millisecond * time.Duration(100))
    n := rand.Intn(100)
    if n > 50 {
        return true
    } else {
        return false
    }
}

// 场景5: 定时的反馈结果(Ticker)
// 测试上传接口的性能,要实时给出指标: 吞吐率,成功率等

func RunScenario5() {
    totalSize := int64(0)
    totalCount := int64(0)
    totalErr := int64(0)

    concurrencyCount := runtime.NumCPU()
    stop := make(chan struct{})
    fileSizeExample := int64(10)

    timeout := 10 // seconds to stop

    go func() {
        for i := 0; i < concurrencyCount; i++ {
            go func(index int) {
                for {
                    select {
                    case <-stop:
                        return
                    default:
                        break
                    }

                    res := doUploadMock()
                    if res {
                        atomic.AddInt64(&totalCount, 1)
                        atomic.AddInt64(&totalSize, fileSizeExample)
                    } else {
                        atomic.AddInt64(&totalErr, 1)
                    }
                }
            }(i)
        }
    }()

    t := time.NewTicker(time.Second)
    index := 0
    for {
        select {
        case <-t.C:
            index++
            tmpCount := atomic.LoadInt64(&totalCount)
            tmpSize := atomic.LoadInt64(&totalSize)
            tmpErr := atomic.LoadInt64(&totalErr)
            fmt.Printf("吞吐率: %d,成功率: %d \n", tmpSize/int64(index), tmpCount*100/(tmpCount+tmpErr))
            if index > timeout {
                t.Stop()
                close(stop)
                return
            }
        }

    }
}

以上所述就是小编给大家介绍的《golang常见的几种并发模型框架》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro JavaScript Techniques

Pro JavaScript Techniques

John Resig / Apress / 2006-12-13 / USD 44.99

Pro JavaScript Techniques is the ultimate JavaScript book for the modern web developer. It provides everything you need to know about modern JavaScript, and shows what JavaScript can do for your web s......一起来看看 《Pro JavaScript Techniques》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具