内容简介:原文地址:Bull是基于Redis的一个Node.js任务队列管理库,支持延迟队列,优先级任务,重复任务,以及原子操作等多种功能.本文将从基本的使用来分析Bull的源码,对于repeat job,seperate processes等暂不展开.
原文地址: www.jianshu.com/p/1ed50e6d4…
Bull是基于 Redis 的一个Node.js任务队列管理库,支持延迟队列,优先级任务,重复任务,以及原子操作等多种功能.
本文将从基本的使用来分析Bull的源码,对于repeat job,seperate processes等暂不展开.
Bull: Premium Queue package for handling jobs and messages in NodeJS.
相关的信息如下:
- 源码地址: github.com/OptimalBits…
- Branch:
develop
- Last commit: 4f5744a
基本使用
Bull的使用分为三个步骤:
- 创建队列
- 绑定任务处理函数
- 添加任务
如下示例:
const Bull = require('bull') // 1. 创建队列 const myFirstQueue = new Bull('my-first-queue'); // 2. 绑定任务处理函数 myFirstQueue.process(async (job, data) => { return doSomething(data); }); // 3. 添加任务 const job = await myFirstQueue.add({ foo: 'bar' }); 复制代码
创建队列
创建队列是先通过 require
然后再通过 new
来实现的,因此要先找到 require
的入口.打开 package.json
:
{ "name": "bull", "version": "3.7.0", "description": "Job manager", "main": "./index.js", ... } 复制代码
看到入口为 index.js
,打开:
module.exports = require('./lib/queue'); module.exports.Job = require('./lib/job'); 复制代码
从而找到目标函数所在文件 ./lib/queue
:
module.exports = Queue; 复制代码
可以看到exports的是 Queue
,接着去分析 Queue
函数:
const Queue = function Queue(name, url, opts) { ... // 默认设置 this.settings = _.defaults(opts.settings, { lockDuration: 30000, stalledInterval: 30000, maxStalledCount: 1, guardInterval: 5000, retryProcessDelay: 5000, drainDelay: 5, // 空队列时brpoplpush的等待时间 backoffStrategies: {} }); ... // Bind these methods to avoid constant rebinding and/or creating closures // in processJobs etc. this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this); this.processJob = this.processJob.bind(this); this.getJobFromId = Job.fromId.bind(null, this); ... }; 复制代码
主要是进行参数初始化和函数的绑定.
绑定任务处理函数
该步骤是从 myFirstQueue.process
开始的,先看 process
函数:
Queue.prototype.process = function (name, concurrency, handler) { ... this.setHandler(name, handler); // 1. 绑定handler return this._initProcess().then(() => { return this.start(concurrency); // 2. 启动队列 }); }; 复制代码
该函数做了两个事情:
- 绑定handler
- 启动队列
先看绑定handler:
Queue.prototype.setHandler = function (name, handler) { ... if (this.handlers[name]) { throw new Error('Cannot define the same handler twice ' + name); } ... if (typeof handler === 'string') { ... } else { handler = handler.bind(this); // 将handler和名字保存起来 if (handler.length > 1) { this.handlers[name] = promisify(handler); } else { this.handlers[name] = function () { ... } } }; 复制代码
再看队列的启动:
Queue.prototype.start = function (concurrency) { return this.run(concurrency).catch(err => { this.emit('error', err, 'error running queue'); throw err; }); }; 复制代码
看 run
函数:
Queue.prototype.run = function (concurrency) { const promises = []; return this.isReady() .then(() => { return this.moveUnlockedJobsToWait(); // 将unlocked的任务移动到wait队列 }) .then(() => { return utils.isRedisReady(this.bclient); }) .then(() => { while (concurrency--) { promises.push( new Promise(resolve => { this.processJobs(concurrency, resolve); // 处理任务 }) ); } this.startMoveUnlockedJobsToWait(); // unlocked job定时检查 return Promise.all(promises); }); }; 复制代码
unlocked job(stalled job): job的运行需要锁,正常情况下job在active时会获取锁(有过期时间 lockDuration
,定时延长 lockRenewTime
),complete时释放锁,如果job在active时无锁,说明进程被阻塞或崩溃导致锁过期
看 processJobs
:
Queue.prototype.processJobs = function (index, resolve, job) { const processJobs = this.processJobs.bind(this, index, resolve); process.nextTick(() => { this._processJobOnNextTick(processJobs, index, resolve, job); }); }; 复制代码
再看 _processJobOnNextTick
:
// 关键代码 const gettingNextJob = job ? Promise.resolve(job) : this.getNextJob(); return (this.processing[index] = gettingNextJob .then(this.processJob) .then(processJobs, err => { ... })); 复制代码
上述代码可以作如下描述:
getNextJob processJob processJobs
先看 getNextJob
:
if (this.drained) { // // Waiting for new jobs to arrive // console.log('bclient start get new job'); return this.bclient .brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay) .then( jobId => { if (jobId) { return this.moveToActive(jobId); } }, err => { ... } ); } else { return this.moveToActive(); } 复制代码
运用Redis的 PUSH/POP
机制来获取消息,超时时间为 drainDelay
.
接着来看 processJob
:
Queue.prototype.processJob = function (job) { ... const handleCompleted = result => { return job.moveToCompleted(result).then(jobData => { ... return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null; }); }; // 延长锁的时间 lockExtender(); const handler = this.handlers[job.name] || this.handlers['*']; if (!handler) { ... } else { let jobPromise = handler(job); ... return jobPromise .then(handleCompleted) .catch(handleFailed) .finally(() => { stopTimer(); }); } }; 复制代码
可以看到任务处理成功后会调用 handleCompleted
,在其中调用的是job的 moveToCompleted
,中间还有一些调用,最终会调用 lua 脚本 moveToFinished
:
... -- Try to get next job to avoid an extra roundtrip if the queue is not closing, -- and not rate limited. ... 复制代码
该脚本到作用是将job移动到completed或failed队列,然后取下一个job.
在 processJob
执行完后就又重复执行 processJobs
,这就是一个循环,这个是核心,如下图:
添加任务
直接看add函数:
Queue.prototype.add = function (name, data, opts) { ... if (opts.repeat) { ... } else { return Job.create(this, name, data, opts); } }; 复制代码
调用的是Job中的create函数:
Job.create = function (queue, name, data, opts) { const job = new Job(queue, name, data, opts); // 1. 创建job return queue .isReady() .then(() => { return addJob(queue, job); // 2. 添加job到队列中 }) ... }; 复制代码
继续沿着 addJob
,最终会调用的是lua脚本的 addJob
,根据job设置将job存入redis.
问题
1. 为什么会出现错误: job stalled more than allowable limit
在run函数中执行了函数 this.startMoveUnlockedJobsToWait()
,来看看该函数:
Queue.prototype.startMoveUnlockedJobsToWait = function () { clearInterval(this.moveUnlockedJobsToWaitInterval); if (this.settings.stalledInterval > 0 && !this.closing) { this.moveUnlockedJobsToWaitInterval = setInterval( this.moveUnlockedJobsToWait, this.settings.stalledInterval ); } }; 复制代码
该函数是用来定时执行 moveUnlockedJobsToWait
函数:
Queue.prototype.moveUnlockedJobsToWait = function () { ... return scripts .moveUnlockedJobsToWait(this) .then(([failed, stalled]) => { const handleFailedJobs = failed.map(jobId => { return this.getJobFromId(jobId).then(job => { this.emit( 'failed', job, new Error('job stalled more than allowable limit'), 'active' ); return null; }); }); ... }) ... ; }; 复制代码
该函数会通过scripts的 moveUnlockedJobsToWait
函数最终调用lua脚本 moveUnlockedJobsToWait
:
... local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1]) ... if(stalledCount > MAX_STALLED_JOB_COUNT) then rcall("ZADD", KEYS[4], ARGV[3], jobId) rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit") table.insert(failed, jobId) else -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. rcall("RPUSH", dst, jobId) rcall('PUBLISH', KEYS[1] .. '@', jobId) table.insert(stalled, jobId) end ... return {failed, stalled} 复制代码
- MAX_STALLED_JOB_COUNT: 默认为1
该脚本会将stalled的job取出并返回,从而生成如题错误.
参考
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- rabbitmq实现延时队列(死信队列)
- 消息队列(三)常见消息队列介绍
- 消息队列探秘 – RabbitMQ 消息队列介绍
- 消息队列和任务队列有什么区别?
- 数据结构之——队列与循环队列
- Redis应用-异步消息队列与延时队列
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
深入解析Spring MVC与Web Flow
Seth Ladd、Darren Davison、Steven Devijver、Colin Yates / 徐哲、沈艳 / 人民邮电出版社 / 2008-11 / 49.00元
《深入解析Spring MVCgn Web Flow》是Spring MVC 和Web Flow 两个框架的权威指南,书中包括的技巧和提示可以让你从这个灵活的框架中汲取尽可能多的信息。书中包含了一些开发良好设计和解耦的Web 应用程序的最佳实践,介绍了Spring 框架中的Spring MVC 和Spring Web Flow,以及着重介绍利用Spring 框架和Spring MVC 编写Web ......一起来看看 《深入解析Spring MVC与Web Flow》 这本书的介绍吧!