- Notifications
You must be signed in to change notification settings - Fork0
Kue is a priority job queue backed by redis, built for node.js.
License
JavaScriptExpert/kue
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Kue is a priority job queue backed byredis, built fornode.js.
PROTIP This is the latest Kue documentation, make sure to also read thechangelist.
Latest release:
$ npm install kueMaster branch:
$ npm install http://github.com/Automattic/kue/tarball/master
- Delayed jobs
- Distribution of parallel work load
- Job event and progress pubsub
- Job TTL
- Optional retries with backoff
- Graceful workers shutdown
- Full-text search capabilities
- RESTful JSON API
- Rich integrated UI
- Infinite scrolling
- UI progress indication
- Job specific logging
- Powered by Redis
- Creating Jobs
- Jobs Priority
- Failure Attempts
- Failure Backoff
- Job TTL
- Job Logs
- Job Progress
- Job Events
- Queue Events
- Delayed Jobs
- Processing Jobs
- Processing Concurrency
- Pause Processing
- Updating Progress
- Graceful Shutdown
- Error Handling
- Queue Maintenance
- Redis Connection Settings
- User-Interface
- JSON API
- Parallel Processing With Cluster
- Securing Kue
- Testing
- Screencasts
- License
First create a jobQueue withkue.createQueue():
varkue=require('kue'),queue=kue.createQueue();
Callingqueue.create() with the type of job ("email"), and arbitrary job data will return aJob, which can then besave()ed, adding it to redis, with a default priority level of "normal". Thesave() method optionally accepts a callback, responding with anerror if something goes wrong. Thetitle key is special-cased, and will display in the job listings within the UI, making it easier to find a specific job.
varjob=queue.create('email',{title:'welcome email for tj',to:'tj@learnboost.com',template:'welcome-email'}).save(function(err){if(!err)console.log(job.id);});
To specify the priority of a job, simply invoke thepriority() method with a number, or priority name, which is mapped to a number.
queue.create('email',{title:'welcome email for tj',to:'tj@learnboost.com',template:'welcome-email'}).priority('high').save();
The default priority map is as follows:
{low:10,normal:0,medium:-5,high:-10,critical:-15};
By default jobs only haveone attempt, that is when they fail, they are marked as a failure, and remain that way until you intervene. However, Kue allows you to specify this, which is important for jobs such as transferring an email, which upon failure, may usually retry without issue. To do this invoke the.attempts() method with a number.
queue.create('email',{title:'welcome email for tj',to:'tj@learnboost.com',template:'welcome-email'}).priority('high').attempts(5).save();
Job retry attempts are done as soon as they fail, with no delay, even if your job had a delay set viaJob#delay. If you want to delay job re-attempts upon failures (known as backoff) you can useJob#backoff method in different ways:
// Honor job's original delay (if set) at each attempt, defaults to fixed backoffjob.attempts(3).backoff(true)// Override delay value, fixed backoffjob.attempts(3).backoff({delay:60*1000,type:'fixed'})// Enable exponential backoff using original delay (if set)job.attempts(3).backoff({type:'exponential'})// Use a function to get a customized next attempt delay valuejob.attempts(3).backoff(function(attempts,delay){//attempts will correspond to the nth attempt failure so it will start with 0//delay will be the amount of the last delay, not the initial delay unless attempts === 0returnmy_customized_calculated_delay;})
In the last scenario, provided function will be executed (via eval) on each re-attempt to get next attempt delay value, meaning that you can't reference external/context variables within it.
Job producers can set an expiry value for the time their job can live in active state, so that if workers didn't reply in timely fashion, Kue will fail it withTTL exceeded error message preventing that job from being stuck in active state and spoiling concurrency.
queue.create('email',{title:'email job with TTL'}).ttl(milliseconds).save();
Job-specific logs enable you to expose information to the UI at any point in the job's life-time. To do so simply invokejob.log(), which accepts a message string as well as variable-arguments for sprintf-like support:
job.log('$%d sent to %s',amount,user.name);
or anything else (usesutil.inspect() internally):
job.log({key:'some key',value:10});job.log([1,2,3,5,8]);job.log(10.1);
Job progress is extremely useful for long-running jobs such as video conversion. To update the job's progress simply invokejob.progress(completed, total [, data]):
job.progress(frames,totalFrames);
data can be used to pass extra information about the job. For example a message or an object with some extra contextual data to the current status.
Job-specific events are fired on theJob instances via Redis pubsub. The following events are currently supported:
enqueuethe job is now queuedstartthe job is now runningpromotionthe job is promoted from delayed state to queuedprogressthe job's progress ranging from 0-100failed attemptthe job has failed, but has remaining attempts yetfailedthe job has failed and has no remaining attemptscompletethe job has completedremovethe job has been removed
For example this may look something like the following:
varjob=queue.create('video conversion',{title:'converting loki\'s to avi',user:1,frames:200});job.on('complete',function(result){console.log('Job completed with data ',result);}).on('failed attempt',function(errorMessage,doneAttempts){console.log('Job failed');}).on('failed',function(errorMessage){console.log('Job failed');}).on('progress',function(progress,data){console.log('\r job #'+job.id+' '+progress+'% complete with data ',data);});
Note that Job level events are not guaranteed to be received upon process restarts, since restarted node.js process will lose the reference to the specific Job object. If you want a more reliable event handler look forQueue Events.
Note Kue stores job objects in memory until they are complete/failed to be able to emit events on them. If you have a huge concurrency in uncompleted jobs, turn this feature off and use queue level events for better memory scaling.
kue.createQueue({jobEvents:false})
Alternatively, you can use the job level functionevents to control whether events are fired for a job at the job level.
varjob=queue.create('test').events(false).save();
Queue-level events provide access to the job-level events previously mentioned, however scoped to theQueue instance to apply logic at a "global" level. An example of this is removing completed jobs:
queue.on('job enqueue',function(id,type){console.log('Job %s got queued of type %s',id,type);}).on('job complete',function(id,result){kue.Job.get(id,function(err,job){if(err)return;job.remove(function(err){if(err)throwerr;console.log('removed completed job #%d',job.id);});});});
The events available are the same as mentioned in "Job Events", however prefixed with "job ".
Delayed jobs may be scheduled to be queued for an arbitrary distance in time by invoking the.delay(ms) method, passing the number of milliseconds relative tonow. Alternatively, you can pass a JavaScriptDate object with a specific time in the future.This automatically flags theJob as "delayed".
varemail=queue.create('email',{title:'Account renewal required',to:'tj@learnboost.com',template:'renewal-email'}).delay(milliseconds).priority('high').save();
Kue will check the delayed jobs with a timer, promoting them if the scheduled delay has been exceeded, defaulting to a check of top 1000 jobs every second.
Processing jobs is simple with Kue. First create aQueue instance much like we do for creating jobs, providing us access to redis etc, then invokequeue.process() with the associated type.Note that unlike what the namecreateQueue suggests, it currently returns a singletonQueue instance. So you can configure and use only a singleQueue object within your node.js process.
In the following example we pass the callbackdone toemail, When an error occurs we invokedone(err) to tell Kue something happened, otherwise we invokedone() only when the job is complete. If this function responds with an error it will be displayed in the UI and the job will be marked as a failure. The error object passed to done, should be of standard typeError.
varkue=require('kue'),queue=kue.createQueue();queue.process('email',function(job,done){email(job.data.to,done);});functionemail(address,done){if(!isValidEmail(address)){//done('invalid to address') is possible but discouragedreturndone(newError('invalid to address'));}// email send stuff...done();}
Workers can also pass job result as the second parameter to donedone(null,result) to store that inJob.result key.result is also passed throughcomplete event handlers so that job producers can receive it if they like to.
By default a call toqueue.process() will only accept one job at a time for processing. For small tasks like sending emails this is not ideal, so we may specify the maximum active jobs for this type by passing a number:
queue.process('email',20,function(job,done){// ...});
Workers can temporarily pause and resume their activity. That is, after callingpause they will receive no jobs in their process callback untilresume is called. Thepause function gracefully shutdowns this worker, and uses the same internal functionality as theshutdown method inGraceful Shutdown.
queue.process('email',function(job,ctx,done){ctx.pause(5000,function(err){console.log("Worker is paused... ");setTimeout(function(){ctx.resume();},10000);});});
NoteThectx parameter from Kue>=0.9.0 is the second argument of the process callback function anddone is idiomatically always the last
NoteThepause method signature is changed from Kue>=0.9.0 to move the callback function to the last.
For a "real" example, let's say we need to compile a PDF from numerous slides withnode-canvas. Our job may consist of the following data, note that in general you shouldnot store large data in the job it-self, it's better to store references like ids, pulling them in while processing.
queue.create('slideshow pdf',{title:user.name+"'s slideshow",slides:[...]// keys to data stored in redis, mongodb, or some other store});
We can access this same arbitrary data within a separate process while processing, via thejob.data property. In the example we render each slide one-by-one, updating the job's log and progress.
queue.process('slideshow pdf',5,function(job,done){varslides=job.data.slides,len=slides.length;functionnext(i){varslide=slides[i];// pretend we did a query on this slide id ;)job.log('rendering %dx%d slide',slide.width,slide.height);renderSlide(slide,function(err){if(err)returndone(err);job.progress(i,len,{nextSlide :i==len ?'itsdone' :i+1});if(i==len)done()elsenext(i+1);});}next(0);});
Queue#shutdown([timeout,] fn) signals all workers to stop processing after their current active job is done. Workers will waittimeout milliseconds for their active job's done to be called or mark the active jobfailed with shutdown error reason. When all workers tell Kue they are stoppedfn is called.
varqueue=require('kue').createQueue();process.once('SIGTERM',function(sig){queue.shutdown(5000,function(err){console.log('Kue shutdown: ',err||'');process.exit(0);});});
Notethatshutdown method signature is changed from Kue>=0.9.0 to move the callback function to the last.
All errors either in Redis client library or Queue are emitted to theQueue object. You should bind toerror events to prevent uncaught exceptions or debug kue errors.
varqueue=require('kue').createQueue();queue.on('error',function(err){console.log('Oops... ',err);});
Kue marks a job complete/failed whendone is called by your worker, so you should use proper error handling to prevent uncaught exceptions in your worker's code and node.js process exiting before in handle jobs get done.This can be achieved in two ways:
- Wrapping your worker's process function inDomains
queue.process('my-error-prone-task',function(job,done){vardomain=require('domain').create();domain.on('error',function(err){done(err);});domain.run(function(){// your process functionthrownewError('bad things happen');done();});});
Notice - Domains aredeprecated from Nodejs withstability 0 and it's not recommended to use.
This is the softest and best solution, however is not built-in with Kue. Please refer tothis discussion. You can comment on this feature in the related open Kueissue.
You can also use promises to do something like
queue.process('my-error-prone-task',function(job,done){Promise.method(function(){// your process functionthrownewError('bad things happen');})().nodeify(done)});
but this won't catch exceptions in your async call stack as domains do.
- Binding to
uncaughtExceptionand gracefully shutting down the Kue, however this is not a recommended error handling idiom in javascript since you are losing the error context.
process.once('uncaughtException',function(err){console.error('Something bad happened: ',err);queue.shutdown(1000,function(err2){console.error('Kue shutdown result: ',err2||'OK');process.exit(0);});});
Kue currently uses client side job state management and when redis crashes in the middle of that operations, some stuck jobs or index inconsistencies will happen. The consequence is that certain number of jobs will be stuck, and be pulled out by worker only when new jobs are created, if no more new jobs are created, they stuck forever. So westrongly suggest that you run watchdog to fix this issue by calling:
queue.watchStuckJobs(interval)
interval is in milliseconds and defaults to 1000ms
Kue will be refactored to fully atomic job state management from version 1.0 and this will happen by lua scripts and/or BRPOPLPUSH combination. You can read morehere andhere.
Queue object has two type of methods to tell you about the number of jobs in each state
queue.inactiveCount(function(err,total){// others are activeCount, completeCount, failedCount, delayedCountif(total>100000){console.log('We need some back pressure here');}});
you can also query on an specific job type:
queue.failedCount('my-critical-job',function(err,total){if(total>10000){console.log('This is tOoOo bad');}});
and iterating over job ids
queue.inactive(function(err,ids){// others are active, complete, failed, delayed// you may want to fetch each id to get the Job object out of it...});
however the second one doesn't scale to large deployments, there you can use more specificJob static methods:
kue.Job.rangeByState('failed',0,n,'asc',function(err,jobs){// you have an array of maximum n Job objects here});
or
kue.Job.rangeByType('my-job-type','failed',0,n,'asc',function(err,jobs){// you have an array of maximum n Job objects here});
Notethat the last two methods are subject to change in later Kue versions.
If you did none of above inError Handling section or your process lost active jobs in any way, you can recover from them when your process is restarted. A blind logic would be to re-queue all stuck jobs:
queue.active(function(err,ids){ids.forEach(function(id){kue.Job.get(id,function(err,job){// Your application should check if job is a stuck onejob.inactive();});});});
Notein a clustered deployment your application should be aware not to involve a job that is valid, currently inprocess by other workers.
Jobs data and search indexes eat up redis memory space, so you will need some job-keeping process in real world deployments. Your first chance is using automatic job removal on completion.
queue.create( ...).removeOnComplete(true).save()
But if you eventually/temporally need completed job data, you can setup an on-demand job removal script like below to remove topn completed jobs:
kue.Job.rangeByState('complete',0,n,'asc',function(err,jobs){jobs.forEach(function(job){job.remove(function(){console.log('removed ',job.id);});});});
Notethat you should provide enough time for.remove calls on each job object to complete before your process exits, or job indexes will leak
By default, Kue will connect to Redis using the client default settings (port defaults to6379, host defaults to127.0.0.1, prefix defaults toq).Queue#createQueue(options) accepts redis connection options inoptions.redis key.
varkue=require('kue');varq=kue.createQueue({prefix:'q',redis:{port:1234,host:'10.0.50.20',auth:'password',db:3,// if provided select a non-default redis dboptions:{// see https://github.com/mranney/node_redis#rediscreateclient}}});
prefix controls the key names used in Redis. By default, this is simplyq. Prefix generally shouldn't be changed unless you need to use one Redis instance for multiple apps. It can also be useful for providing an isolated testbed across your main application.
You can also specify the connection information as a URL string.
varq=kue.createQueue({redis:'redis://example.com:1234?redis_option=value&redis_option=value'});
Sincenode_redis supports Unix Domain Sockets, you can also tell Kue to do so. Seeunix-domain-socket for your redis server configuration.
varkue=require('kue');varq=kue.createQueue({prefix:'q',redis:{socket:'/data/sockets/redis.sock',auth:'password',options:{// see https://github.com/mranney/node_redis#rediscreateclient}}});
Any node.js redis client library that conforms (or when adapted) tonode_redis API can be injected into Kue. You should only provide acreateClientFactory function as a redis connection factory instead of providing node_redis connection options.
Below is a sample code to enableredis-sentinel to connect toRedis Sentinel for automatic master/slave failover.
varkue=require('kue');varSentinel=require('redis-sentinel');varendpoints=[{host:'192.168.1.10',port:6379},{host:'192.168.1.11',port:6379}];varopts=options||{};// Standard node_redis client optionsvarmasterName='mymaster';varsentinel=Sentinel.Sentinel(endpoints);varq=kue.createQueue({redis:{createClientFactory:function(){returnsentinel.createClient(masterName,opts);}}});
Notethat all<0.8.x client codes should be refactored to pass redis options toQueue#createQueue instead of monkey patched style overriding ofredis#createClient or they will be broken from Kue0.8.x.
varRedis=require('ioredis');varkue=require('kue');// using https://github.com/72squared/vagrant-redis-clustervarqueue=kue.createQueue({redis:{createClientFactory:function(){returnnewRedis.Cluster([{port:7000},{port:7001}]);}}});
The UI is a smallExpress application.A script is provided inbin/ for running the interface as a standalone applicationwith default settings. You may pass in options for the port, redis-url, and prefix. For example:
node_modules/kue/bin/kue-dashboard -p 3050 -r redis://127.0.0.1:3000 -q prefixYou can fire it up from within another application too:
varkue=require('kue');kue.createQueue(...);kue.app.listen(3000);
The title defaults to "Kue", to alter this invoke:
kue.app.set('title','My Application');
Notethat if you are using non-default Kue options,kue.createQueue(...) must be called before accessingkue.app.
You can also useKue-UI web interface contributed byArnaud Bénard
Along with the UI Kue also exposes a JSON API, which is utilized by the UI.
Query jobs, for example "GET /job/search?q=avi video":
["5","7","10"]
By default kue indexes the whole Job data object for searching, but this can be customized via callingJob#searchKeys to tell kue which keys on Job data to create index for:
varkue=require('kue');queue=kue.createQueue();queue.create('email',{title:'welcome email for tj',to:'tj@learnboost.com',template:'welcome-email'}).searchKeys(['to','title']).save();
Search feature is turned off by default from Kue>=0.9.0. Read more about thishere. You should enable search indexes and addreds in your dependencies if you need to:
varkue=require('kue');q=kue.createQueue({disableSearch:false});
npm install reds --saveCurrently responds with state counts, and worker activity time in milliseconds:
{"inactiveCount":4,"completeCount":69,"activeCount":2,"failedCount":0,"workTime":20892}
Get a job by:id:
{"id":"3","type":"email","data":{"title":"welcome email for tj","to":"tj@learnboost.com","template":"welcome-email"},"priority":-10,"progress":"100","state":"complete","attempts":null,"created_at":"1309973155248","updated_at":"1309973155248","duration":"15002"}
Get job:id's log:
['foo','bar','baz']
Get jobs with the specified range:from to:to, for example "/jobs/0..2", where:order may be "asc" or "desc":
[{"id":"12","type":"email","data":{"title":"welcome email for tj","to":"tj@learnboost.com","template":"welcome-email"},"priority":-10,"progress":0,"state":"active","attempts":null,"created_at":"1309973299293","updated_at":"1309973299293"},{"id":"130","type":"email","data":{"title":"welcome email for tj","to":"tj@learnboost.com","template":"welcome-email"},"priority":-10,"progress":0,"state":"active","attempts":null,"created_at":"1309975157291","updated_at":"1309975157291"}]
Same as above, restricting by:state which is one of:
- active- inactive- failed- completeSame as above, however restricted to:type and:state.
Delete job:id:
$ curl -X DELETE http://local:3000/job/2{"message":"job 2 removed"}Create a job:
$ curl -H "Content-Type: application/json" -X POST -d \ '{ "type": "email", "data": { "title": "welcome email for tj", "to": "tj@learnboost.com", "template": "welcome-email" }, "options" : { "attempts": 5, "priority": "high" } }' http://localhost:3000/job{"message": "job created", "id": 3}You can create multiple jobs at once by passing an array. In this case, the response will be an array too, preserving the order:
$ curl -H "Content-Type: application/json" -X POST -d \ '[{ "type": "email", "data": { "title": "welcome email for tj", "to": "tj@learnboost.com", "template": "welcome-email" }, "options" : { "attempts": 5, "priority": "high" } }, { "type": "email", "data": { "title": "followup email for tj", "to": "tj@learnboost.com", "template": "followup-email" }, "options" : { "delay": 86400, "attempts": 5, "priority": "high" } }]' http://localhost:3000/job[ {"message": "job created", "id": 4}, {"message": "job created", "id": 5}]Note: when inserting multiple jobs in bulk, if one insertion fails Kue will keep processing the remaining jobs in order. The response array will contain the ids of the jobs added successfully, and any failed element will be an object describing the error:{"error": "error reason"}.
The example below shows how you may useCluster to spread the job processing load across CPUs. Please seeCluster module's documentation for more detailed examples on using it.
When cluster.isMaster the file is being executed in context of the master process, in which case you may perform tasks that you only want once, such as starting the web app bundled with Kue. The logic in theelse block is executedper worker.
varkue=require('kue'),cluster=require('cluster'),queue=kue.createQueue();varclusterWorkerSize=require('os').cpus().length;if(cluster.isMaster){kue.app.listen(3000);for(vari=0;i<clusterWorkerSize;i++){cluster.fork();}}else{queue.process('email',10,function(job,done){varpending=5,total=pending;varinterval=setInterval(function(){job.log('sending!');job.progress(total-pending,total);--pending||done();pending||clearInterval(interval);},1000);});}
This will create anemail job processor (worker) per each of your machine CPU cores, with each you can handle 10 concurrent email jobs, leading to total10 * N concurrent email jobs processed in yourN core machine.
Now when you visit Kue's UI in the browser you'll see that jobs are being processed roughlyN times faster! (if you haveN cores).
Through the use of app mounting you may customize the web application, enabling TLS, or adding additional middleware likebasic-auth-connect.
$ npm install --save basic-auth-connect
varbasicAuth=require('basic-auth-connect');varapp=express.createServer({ ...tlsoptions ...});app.use(basicAuth('foo','bar'));app.use(kue.app);app.listen(3000);
Enable test mode to push all jobs into ajobs array. Make assertions againstthe jobs in that array to ensure code under test is correctly enqueuing jobs.
queue=require('kue').createQueue();before(function(){queue.testMode.enter();});afterEach(function(){queue.testMode.clear();});after(function(){queue.testMode.exit()});it('does something cool',function(){queue.createJob('myJob',{foo:'bar'}).save();queue.createJob('anotherJob',{baz:'bip'}).save();expect(queue.testMode.jobs.length).to.equal(2);expect(queue.testMode.jobs[0].type).to.equal('myJob');expect(queue.testMode.jobs[0].data).to.eql({foo:'bar'});});
IMPORTANT: By default jobs aren't processed when created during test mode. You can enable job processing by passing true to testMode.enter
before(function(){queue.testMode.enter(true);});
- Introduction to Kue
- APIwalkthrough to Kue
We love contributions!
When contributing, follow the simple rules:
- Don't violateDRY principles.
- Boy Scout Rule needs to have been applied.
- Your code should look like all the other code – this project should look like it was written by one person, always.
- If you want to propose something – just create an issue and describe your question with as much description as you can.
- If you think you have some general improvement, consider creating a pull request with it.
- If you add new code, it should be covered by tests. No tests – no code.
- If you add a new feature, don't forget to update the documentation for it.
- If you find a bug (or at least you think it is a bug), create an issue with the library version and test case that we can run and see what are you talking about, or at least full steps by which we can reproduce it.
(The MIT License)
Copyright (c) 2011 LearnBoost <tj@learnboost.com>
Permission is hereby granted, free of charge, to any person obtaininga copy of this software and associated documentation files (the'Software'), to deal in the Software without restriction, includingwithout limitation the rights to use, copy, modify, merge, publish,distribute, sublicense, and/or sell copies of the Software, and topermit persons to whom the Software is furnished to do so, subject tothe following conditions:
The above copyright notice and this permission notice shall beincluded in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OFMERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANYCLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THESOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
About
Kue is a priority job queue backed by redis, built for node.js.
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Languages
- JavaScript75.9%
- CoffeeScript13.0%
- CSS9.1%
- HTML1.6%
- Makefile0.4%
