内容简介:任务队列对任务队列进行遍历操作。任务队列不止一个,而是存在多个任务队列,每次都会从这些任务队列中获取一个任务出来,如果任务存在则将任务包装成一个结构体;在获取到任务后,就是获取一个任务的执行者获取一个Worker
-
什么是ForkJoin
接触到
ForkJoin框架是因为学习Java中的Stream中的并行流,并行流的底层就是借助ForkJoin框架ForkJoin框架更适合现在CPU多核的机器,一般用于处理可以将一个大任务分解成数个互相没有依赖性的小任务,利用分治的策略,将任务不断变小,将这些小任务分发到CPU的核中,将子任务并行运行,大大加快任务处理速度具体的很多博客上说的都很不错,这里也不细说了,给几个我当时学习的博客地址吧
-
任务偷窃
任务偷窃算法其实就是
Worker可以从自己对应的工作队列头部或者其他Worker的工作队列尾部获取元素。每次在轮询任务队列时,先从每个
Worker对应的任务队列中去获取任务,如果发现任务队列此时没有待处理的任务,那么这个时候就会采用随机选取策略,随机选择一个Worker对应的工作队列,去窃取它的任务 -
Join子任务结果
在 Java 中需要去不断的获取任务的执行情况,如果任务执行完就返回任务处理的结果;而在Golang中,由于chan的存在,使得Java的Future模式非常容易实现,只需要任务Join的时候去读取通道就可以,因为当我们把chan的cap设置为1时,如果通道中没有数据,读取一方是会被阻塞等待的
func (f *ForkJoinTask) Join() (bool, interface{}) {
for {
select {
case data, ok := <-f.result:
if ok {
return true, data
}
case <-f.ctx.Done():
panic(f.taskPool.err)
}
}
}
复制代码
核心代码
任务队列
对任务队列进行遍历操作。任务队列不止一个,而是存在多个任务队列,每次都会从这些任务队列中获取一个任务出来,如果任务存在则将任务包装成一个结构体;在获取到任务后,就是获取一个任务的执行者 worker
了,随后将包装好的任务送入 Worker
的chan通道中异步发送任务
func (fp *ForkJoinPool) run(ctx context.Context) {
go func() {
wId := int32(0)
for {
select {
case <-ctx.Done():
fmt.Printf("here is err")
fp.err = fp.wp.err
return
default:
hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
if hasTask {
fp.wp.Submit(ctx, &struct {
T Task
F *ForkJoinTask
C context.Context
}{T: job, F: ft, C: ctx})
}
wId = (wId + 1) % fp.cap
}
}
}()
}
复制代码
获取一个Worker
在 ForkJoin
初始化的时候,根据CPU核数对 Worker
池进行初始化操作
func newPool(ctx context.Context, cancel context.CancelFunc) *Pool {
...
wCnt := runtime.NumCPU()
for i := 0; i < wCnt; i ++ {
w := newWorker(p)
w.run(ctx)
p.workers = append(p.workers, w)
}
...
}
复制代码
随后,处理任务肯定需要一个对应的worker去执行的,因此每次在获取 worker
时,会先去 worker
池中判断是否还存在空闲的 worker
,如果存在就直接获取一个 worker
,否则直接创建一个 worker
进行接受任务
func (p *Pool) retrieveWorker(ctx context.Context) *Worker {
var w *Worker
idleWorker := p.workers
if len(idleWorker) >= 1 {
p.lock.Lock()
n := len(idleWorker) - 1
w = idleWorker[n]
p.workers = idleWorker[:n]
p.lock.Unlock()
} else {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*Worker)
} else {
w = &Worker{
pool: p,
job: make(chan *struct {
T Task
F *ForkJoinTask
C context.Context
}, 1),
}
}
w.run(ctx)
}
return w
}
复制代码
Worker
真正执行任务的对象,每个 worker
绑定一个 goruntine
,并且有一个 chan
通道,用于异步接收任务以及在 goruntine
中异步将任务取出并执行;当任务执行完后,将 worker
返回到 worker
池中
func (w *Worker) run(ctx context.Context) {
go func() {
var tmpTask *ForkJoinTask
defer func() {
if p := recover(); p != nil {
w.pool.panicHandler(p)
if tmpTask != nil {
w.pool.err = p
close(tmpTask.result)
}
}
}()
for {
select {
case <-ctx.Done():
fmt.Println("An exception occurred and the task has stopped")
return
default:
for job := range w.job {
if job == nil {
w.pool.workerCache.Put(w)
return
}
tmpTask = job.F
job.F.result <- job.T.Compute()
w.pool.releaseWorker(w)
}
}
}
}()
}
复制代码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- php如何实现session,自己实现session,laravel如何实现session
- AOP如何实现及实现原理
- webpack 实现 HMR 及其实现原理
- Docker实现原理之 - OverlayFS实现原理
- 为什么实现 .NET 的 ICollection 集合时需要实现 SyncRoot 属性?如何正确实现这个属性?
- 自己实现集合框架(十):顺序栈的实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。