内容简介:感谢我们使用Go语言,基本上是因为他原生支持的高并发:Goroutine 和 Channel;Go 的并发属于 CSP 并发模型的一种实现;
前言
感谢 Handling 1 Million Requests per Minute with Go 这篇文章给予的巨大启发。
基础
我们使用 Go 语言,基本上是因为他原生支持的高并发:Goroutine 和 Channel;
Go 的并发属于 CSP 并发模型的一种实现;
CSP 并发模型的核心概念是:“不要通过共享内存来通信,而应该通过通信来共享内存”。
简单用法
我一开始学习Go语言的时候,遇到大访问量的时候,会先创建一个带缓冲的channel,然后起一个Go协程来逐个读取channel中的数据并处理。
说他是并发是因为他没有占用主线程,而是另起了一个协程独自运行。但是这没有实现请求之间的并发。
特别注意:Go语言中的map不是并发安全的,要想实现并发安全,需要自己实现(如加锁),或者使用sync.Map。
package main
import (
"fmt"
"runtime"
"time"
)
func main(){
//这里我们假设数据是int类型,缓存格式设为100
dataChan:=make(chan int,100)
go func(){
for{
select{
case data:=<-dataChan:
fmt.Println("data:",data)
time.Sleep(1 * time.Second)//这里延迟是模拟处理数据的耗时
}
}
}()
//填充数据
for i:=0;i<100;i++{
dataChan<-i
}
//这里循环打印查看协程个数
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}
这里打印出来的协程个数时2,为什么? 因为main方法独占一个主协程,我们又起了一个协程,所以是两个。
实现百万级的并发
首先我们要抽象出几个概念:
Job:
type Job interface {
Do()
}
// 一个数据接口,所有的数据都要实现该接口,才能被传递进来
//实现Job接口的一个数据实例,需要实现一个Do()方法,对数据的处理就在这个Do()方法中。
Job通道:
这里有两个Job通道:
1、WorkerPool的Job channel,用于调用者把具体的数据写入到这里,WorkerPool读取。
2、Worker的Job channel,当WorkerPool读取到Job,并拿到可用的Worker的时候,会将Job实例写入该Worker的Job channel,用来直接执行Do()方法。
Worker:
type Worker struct {
JobQueue chan Job //Worker的Job通道
}
//每一个被初始化的worker都会在后期单独占用一个协程
//初始化的时候会先把自己的JobQueue传递到Worker通道中,
//然后阻塞读取自己的JobQueue,读到一个Job就执行Job对象的Do()方法。
工作池(WorkerPool):
type WorkerPool struct {
workerlen int //WorkerPool中同时 存在Worker的个数
JobQueue chan Job // WorkerPool的Job通道
WorkerQueue chan chan Job
}
//初始化时会按照传入的num,启动num个后台协程,然后循环读取Job通道里面的数据,
//读到一个数据时,再获取一个可用的Worker,并将Job对象传递到该Worker的chan通道
整个过程中 每个Worker都会被运行在一个协程中,在整个WorkerPool中就会有num可空闲的Worker,当来一条数据的时候,就会在工作池中去一个空闲的Worker去执行该Job,当工作池中没有可用的worker时,就会阻塞等待一个空闲的worker。
这是一个粗糙最简单的版本,只是为了演示效果,具体使用需要根据实际情况加一些特殊的处理。
当数据无限多的时候func (wp *WorkerPool) Run() 会无限创建协程,这里需要做一些处理,这里是为了让所有的请求不等待,并且体现一下最大峰值时的协程数。具体因项目而异。
代码地址: https://github.com/wangzhen0625/gonote/tree/master/7goroutune
main.go
package main
import (
"fmt"
"runtime"
"time"
)
type Score struct {
Num int
}
func (s *Score) Do() {
fmt.Println("num:", s.Num)
time.Sleep(1 * 1 * time.Second)
}
func main() {
num := 100 * 100 * 20
// debug.SetMaxThreads(num + 1000) //设置最大线程数
// 注册工作池,传入任务
// 参数1 worker并发个数
p := NewWorkerPool(num)
p.Run()
datanum := 100 * 100 * 100 * 100
go func() {
for i := 1; i <= datanum; i++ {
sc := &Score{Num: i}
p.JobQueue <- sc
}
}()
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}
job.go
package main
type Job interface {
Do()
}
worker.go
package main
type Worker struct {
JobQueue chan Job
}
func NewWorker() Worker {
return Worker{JobQueue: make(chan Job)}
}
func (w Worker) Run(wq chan chan Job) {
go func() {
for {
wq <- w.JobQueue
select {
case job := <-w.JobQueue:
job.Do()
}
}
}()
}
workerpool.go
package main
import "fmt"
type WorkerPool struct {
workerlen int
JobQueue chan Job
WorkerQueue chan chan Job
}
func NewWorkerPool(workerlen int) *WorkerPool {
return &WorkerPool{
workerlen: workerlen,
JobQueue: make(chan Job),
WorkerQueue: make(chan chan Job, workerlen),
}
}
func (wp *WorkerPool) Run() {
fmt.Println("初始化worker")
//初始化worker
for i := 0; i < wp.workerlen; i++ {
worker := NewWorker()
worker.Run(wp.WorkerQueue)
}
// 循环获取可用的worker,往worker中写job
go func() {
for {
select {
case job := <-wp.JobQueue:
worker := <-wp.WorkerQueue
worker <- job
}
}
}()
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Python多进程并发与多线程并发编程实例总结
- JVM指令分析实例三(方法调用、类实例)
- 通过实例入门Golang
- Iptables详解+实例
- elasticsearch之实例篇
- Kolla配置实例网络
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Windows黑客编程技术详解
甘迪文 / 人民邮电出版社 / 2018-12 / 108
《Windows黑客编程技术详解》介绍的是黑客编程的基础技术,涉及用户层下的Windows编程和内核层下的Rootkit编程。本书分为用户篇和内核篇两部分,用户篇包括11章,配套49个示例程序源码;内核篇包括7章,配套28个示例程序源码。本书介绍的每个技术都有详细的实现原理,以及对应的示例代码(配套代码均支持32位和64位Windows 7、Windows 8.1及Windows 10系统),旨在......一起来看看 《Windows黑客编程技术详解》 这本书的介绍吧!