Golang实现ForkJoin小文

栏目: Java · 发布时间: 5年前

内容简介:任务队列对任务队列进行遍历操作。任务队列不止一个,而是存在多个任务队列,每次都会从这些任务队列中获取一个任务出来,如果任务存在则将任务包装成一个结构体;在获取到任务后,就是获取一个任务的执行者获取一个Worker
  • 什么是ForkJoin

    接触到 ForkJoin 框架是因为学习 Java 中的 Stream 中的并行流,并行流的底层就是借助 ForkJoin 框架

    ForkJoin 框架更适合现在CPU多核的机器,一般用于处理可以将一个大任务分解成数个互相没有依赖性的小任务,利用分治的策略,将任务不断变小,将这些小任务分发到CPU的核中,将子任务并行运行,大大加快任务处理速度

    具体的很多博客上说的都很不错,这里也不细说了,给几个我当时学习的博客地址吧

  • 任务偷窃

    任务偷窃算法其实就是 Worker 可以从自己对应的工作队列头部或者其他 Worker 的工作队列尾部获取元素。

    每次在轮询任务队列时,先从每个 Worker 对应的任务队列中去获取任务,如果发现任务队列此时没有待处理的任务,那么这个时候就会采用随机选取策略,随机选择一个 Worker 对应的工作队列,去窃取它的任务

  • Join子任务结果

    Java 中需要去不断的获取任务的执行情况,如果任务执行完就返回任务处理的结果;而在Golang中,由于chan的存在,使得Java的Future模式非常容易实现,只需要任务Join的时候去读取通道就可以,因为当我们把chan的cap设置为1时,如果通道中没有数据,读取一方是会被阻塞等待的

func (f *ForkJoinTask) Join() (bool, interface{}) {
	for {
		select {
		case data, ok := <-f.result:
			if ok {
				return true, data
			}
		case <-f.ctx.Done():
			panic(f.taskPool.err)
		}
	}
}
复制代码

核心代码

任务队列

对任务队列进行遍历操作。任务队列不止一个,而是存在多个任务队列,每次都会从这些任务队列中获取一个任务出来,如果任务存在则将任务包装成一个结构体;在获取到任务后,就是获取一个任务的执行者 worker 了,随后将包装好的任务送入 Worker 的chan通道中异步发送任务

func (fp *ForkJoinPool) run(ctx context.Context) {
	go func() {
		wId := int32(0)
		for {
			select {
			case <-ctx.Done():
				fmt.Printf("here is err")
				fp.err = fp.wp.err
				return
			default:
				hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
				if hasTask {
					fp.wp.Submit(ctx, &struct {
						T Task
						F *ForkJoinTask
						C context.Context
					}{T: job, F: ft, C: ctx})
				}
				wId = (wId + 1) % fp.cap
			}
		}
	}()
}
复制代码

获取一个Worker

ForkJoin 初始化的时候,根据CPU核数对 Worker 池进行初始化操作

func newPool(ctx context.Context, cancel context.CancelFunc) *Pool {
	...
	wCnt := runtime.NumCPU()
	for i := 0; i < wCnt; i ++ {
		w := newWorker(p)
		w.run(ctx)
		p.workers = append(p.workers, w)
	}
	...
}
复制代码

随后,处理任务肯定需要一个对应的worker去执行的,因此每次在获取 worker 时,会先去 worker 池中判断是否还存在空闲的 worker ,如果存在就直接获取一个 worker ,否则直接创建一个 worker 进行接受任务

func (p *Pool) retrieveWorker(ctx context.Context) *Worker {

	var w *Worker

	idleWorker := p.workers

	if len(idleWorker) >= 1 {
		p.lock.Lock()
		n := len(idleWorker) - 1
		w = idleWorker[n]
		p.workers = idleWorker[:n]
		p.lock.Unlock()
	} else {
		if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
			w = cacheWorker.(*Worker)
		} else {
			w = &Worker{
				pool: p,
				job: make(chan *struct {
					T Task
					F *ForkJoinTask
					C context.Context
				}, 1),
			}
		}
		w.run(ctx)
	}
	return w
}
复制代码

Worker

真正执行任务的对象,每个 worker 绑定一个 goruntine ,并且有一个 chan 通道,用于异步接收任务以及在 goruntine 中异步将任务取出并执行;当任务执行完后,将 worker 返回到 worker 池中

func (w *Worker) run(ctx context.Context) {
	go func() {

		var tmpTask *ForkJoinTask

		defer func() {
			if p := recover(); p != nil {
				w.pool.panicHandler(p)
				if tmpTask != nil {
					w.pool.err = p
					close(tmpTask.result)
				}
			}
		}()

		for {
			select {
			case <-ctx.Done():
				fmt.Println("An exception occurred and the task has stopped")
				return
			default:
				for job := range w.job {
					if job == nil {
						w.pool.workerCache.Put(w)
						return
					}
					tmpTask = job.F
					job.F.result <- job.T.Compute()
					w.pool.releaseWorker(w)
				}
			}
		}
	}()
}
复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

动手玩转Scratch2.0编程

动手玩转Scratch2.0编程

马吉德·马吉 (Majed Marji) / 电子工业出版社 / 2015-10-1 / CNY 69.00

Scratch 是可视化的编程语言,其丰富的学习环境适合所有年龄阶段的人。利用它可以制作交互式程序、富媒体项目,包括动画故事、读书报告、科学实验、游戏和模拟程序等。《动手玩转Scratch2.0编程—STEAM创新教育指南》的目标是将Scratch 作为工具,教会读者最基本的编程概念,同时揭示Scratch 在教学上的强大能力。 《动手玩转Scratch2.0编程—STEAM创新教育指南》共......一起来看看 《动手玩转Scratch2.0编程》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

html转js在线工具
html转js在线工具

html转js在线工具