Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Lightweight job scheduling for Node.js

License

NotificationsYou must be signed in to change notification settings

agenda/agenda

Repository files navigation

Agenda

A light-weight job scheduling library for Node.js

This was originally a fork of agenda.js,it differs from the original version in following points:

  • Complete rewrite in Typescript (fully typed!)
  • mongodb4 driver (supports mongodb 5.x)
  • Supports mongoDB sharding by name
  • touch() can have an optional progress parameter (0-100)
  • Bugfixes and improvements for locking & job processing (concurrency, lockLimit,..)
  • Breaking change: define() config paramter moved from 2nd position to 3rd
  • getRunningStats()
  • automatically waits for agenda to be connected before calling any database operations
  • uses a database abstraction layer behind the scene
  • does not create a database index by default, you can setensureIndex: true when initializing Agendaor run manually:
db.agendaJobs.ensureIndex({    "name" : 1,    "nextRunAt" : 1,    "priority" : -1,    "lockedAt" : 1,    "disabled" : 1}, "findAndLockNextJobIndex")

Agenda offers

  • Minimal overhead. Agenda aims to keep its code base small.
  • Mongo backed persistence layer.
  • Promises based API.
  • Scheduling with configurable priority, concurrency, repeating and persistence of job results.
  • Scheduling via cron or human readable syntax.
  • Event backed job queue that you can hook into.
  • Agenda-rest: optional standalone REST API.
  • Inversify-agenda - Some utilities for the development of agenda workers with Inversify.
  • Agendash: optional standalone web-interface.

Feature Comparison

Since there are a few job queue solutions, here a table comparing them to help you use the one thatbetter suits your needs.

FeatureBullBeeAgenda
Backendredisredismongo
Priorities
Concurrency
Delayed jobs
Global events
Rate Limiter
Pause/Resume
Sandboxed worker
Repeatable jobs
Atomic ops~
Persistence
UI
REST API
Central (Scalable) Queue
Supports long running jobs
Optimized forJobs / MessagesMessagesJobs

Kudos for making the comparison chart goes toBull maintainers.

Installation

Install via NPM

npm install @hokify/agenda

You will also need a workingMongo database (v4+) to point it to.

Example Usage

constmongoConnectionString='mongodb://127.0.0.1/agenda';constagenda=newAgenda({db:{address:mongoConnectionString}});// Or override the default collection name:// const agenda = new Agenda({db: {address: mongoConnectionString, collection: 'jobCollectionName'}});// or pass additional connection options:// const agenda = new Agenda({db: {address: mongoConnectionString, collection: 'jobCollectionName', options: {ssl: true}}});// or pass in an existing mongodb-native MongoClient instance// const agenda = new Agenda({mongo: myMongoClient});agenda.define('delete old users',asyncjob=>{awaitUser.remove({lastLogIn:{$lt:twoDaysAgo}});});(asyncfunction(){// IIFE to give access to async/awaitawaitagenda.start();awaitagenda.every('3 minutes','delete old users');// Alternatively, you could also do:awaitagenda.every('*/3 * * * *','delete old users');})();
agenda.define('send email report',asyncjob=>{const{ to}=job.attrs.data;awaitemailClient.send({to,from:'example@example.com',subject:'Email Report',body:'...'});},{priority:'high',concurrency:10});(asyncfunction(){awaitagenda.start();awaitagenda.schedule('in 20 minutes','send email report',{to:'admin@example.com'});})();
(asyncfunction(){constweeklyReport=agenda.create('send email report',{to:'example@example.com'});awaitagenda.start();awaitweeklyReport.repeatEvery('1 week').save();})();

Full documentation

See alsohttps://hokify.github.io/agenda/

Agenda's basic control structure is an instance of an agenda. Agenda's aremapped to a database collection and load the jobs from within.

Table of Contents

Configuring an agenda

All configuration methods are chainable, meaning you can do something like:

constagenda=newAgenda();agenda.database(...).processEvery('3 minutes')...;

Possible agenda config options:

{name:string;defaultConcurrency:number;processEvery:number;maxConcurrency:number;defaultLockLimit:number;lockLimit:number;defaultLockLifetime:number;ensureIndex:boolean;sort:SortOptionObject<IJobParameters>;db:{collection:string;address:string;options:MongoClientOptions;}mongo:Db;}

Agenda usesHuman Interval for specifying the intervals. It supports the following units:

seconds,minutes,hours,days,weeks,months -- assumes 30 days,years -- assumes 365 days

More sophisticated examples

agenda.processEvery('one minute');agenda.processEvery('1.5 minutes');agenda.processEvery('3 days and 4 hours');agenda.processEvery('3 days, 4 hours and 36 seconds');

database(url, [collectionName], [MongoClientOptions])

Specifies the database at theurl specified. If no collection name is given,agendaJobs is used.

By defaultuseNewUrlParser anduseUnifiedTopology is set totrue,

agenda.database('localhost:27017/agenda-test','agendaJobs');

You can also specify it during instantiation.

constagenda=newAgenda({db:{address:'localhost:27017/agenda-test',collection:'agendaJobs'}});

Agenda will emit aready event (seeAgenda Events) when properly connected to the database.It is safe to callagenda.start() without waiting for this event, as this is handled internally.If you're using thedb options, or calldatabase, then you may still need to listen forready before saving jobs.

mongo(dbInstance, [collectionName])

Use an existing mongodb-native MongoClient/Db instance. This can help consolidate connections to adatabase. You can instead use.database to have agenda handle connecting for you.

You can also specify it during instantiation:

constagenda=newAgenda({mongo:mongoClientInstance.db('agenda-test')});

Note that MongoClient.connect() returns a mongoClientInstance sincenode-mongodb-native 3.0.0, while it used to return a dbInstance that could then be directly passed to agenda.

name(name)

Sets thelastModifiedBy field toname in the jobs collection.Useful if you have multiple job processors (agendas) and want to see whichjob queue last ran the job.

agenda.name(os.hostname+'-'+process.pid);

You can also specify it during instantiation

constagenda=newAgenda({name:'test queue'});

processEvery(interval)

Takes a stringinterval which can be either a traditional javascript number,or a string such as3 minutes

Specifies the frequency at which agenda will query the database looking for jobsthat need to be processed. Agenda internally usessetTimeout to guarantee thatjobs run at (close to ~3ms) the right time.

Decreasing the frequency will result in fewer database queries, but more jobsbeing stored in memory.

Also worth noting is that if the job queue is shutdown, any jobs stored in memorythat haven't run will still be locked, meaning that you may have to wait for thelock to expire. By default it is'5 seconds'.

agenda.processEvery('1 minute');

You can also specify it during instantiation

constagenda=newAgenda({processEvery:'30 seconds'});

maxConcurrency(number)

Takes anumber which specifies the max number of jobs that can be running atany given moment. By default it is20.

agenda.maxConcurrency(20);

You can also specify it during instantiation

constagenda=newAgenda({maxConcurrency:20});

defaultConcurrency(number)

Takes anumber which specifies the default number of a specific job that can be running atany given moment. By default it is5.

agenda.defaultConcurrency(5);

You can also specify it during instantiation

constagenda=newAgenda({defaultConcurrency:5});

lockLimit(number)

Takes anumber which specifies the max number jobs that can be locked at any given moment. By default it is0 for no max.

agenda.lockLimit(0);

You can also specify it during instantiation

constagenda=newAgenda({lockLimit:0});

defaultLockLimit(number)

Takes anumber which specifies the default number of a specific job that can be locked at any given moment. By default it is0 for no max.

agenda.defaultLockLimit(0);

You can also specify it during instantiation

constagenda=newAgenda({defaultLockLimit:0});

defaultLockLifetime(number)

Takes anumber which specifies the default lock lifetime in milliseconds. Bydefault it is 10 minutes. This can be overridden by specifying thelockLifetime option to a defined job.

A job will unlock if it is finished (ie. the returned Promise resolves/rejectsordone is specified in the params anddone() is called) before thelockLifetime. The lock is useful if the job crashes or times out.

agenda.defaultLockLifetime(10000);

You can also specify it during instantiation

constagenda=newAgenda({defaultLockLifetime:10000});

sort(query)

Takes aquery which specifies the sort query to be used for finding and locking the next job.

By default it is{ nextRunAt: 1, priority: -1 }, which obeys a first in first out approach, with respect to priority.

disableAutoIndex(boolean)

Optional. Disables the automatic creation of the default index on the jobs table.By default, Agenda creates an index to optimize its queries against Mongo while processing jobs.

This is useful if you want to use your own index in specific use-cases.

Agenda Events

An instance of an agenda will emit the following events:

  • ready - called when Agenda mongo connection is successfully opened and indices created.If you're passing agenda an existing connection, you shouldn't need to listen for this, asagenda.start() will not resolve until indices have been created.If you're using thedb options, or calldatabase, then you may still need to listen for theready event before saving jobs.agenda.start() will still wait for the connection to be opened.
  • error - called when Agenda mongo connection process has thrown an error
awaitagenda.start();

Defining Job Processors

Before you can use a job, you must define its processing behavior.

define(jobName, fn, [options])

Defines a job with the name ofjobName. When a job ofjobName gets run, itwill be passed tofn(job, done). To maintain asynchronous behavior, you mayeither provide a Promise-returning function infnor providedone as asecond parameter tofn. Ifdone is specified in the function signature, youmust calldone() when you are processing the job. If your function issynchronous or returns a Promise, you may omitdone from the signature.

options is an optional argument which can overwrite the defaults. It can takethe following:

  • concurrency:number maximum number of that job that can be running at once (per instance of agenda)
  • lockLimit:number maximum number of that job that can be locked at once (per instance of agenda)
  • lockLifetime:number interval in ms of how long the job stays locked for (seemultiple job processors for more info).A job will automatically unlock once a returned promise resolves/rejects (or ifdone is specified in the signature anddone() is called).
  • priority:(lowest|low|normal|high|highest|number) specifies the priorityof the job. Higher priority jobs will run first. See the priority mappingbelow
  • shouldSaveResult:boolean flag that specifies whether the result of the job should also be stored in the database. Defaults to false

Priority mapping:

{  highest: 20,  high: 10,  normal: 0,  low: -10,  lowest: -20}

Async Job:

agenda.define('some long running job',asyncjob=>{constdata=awaitdoSomelengthyTask();awaitformatThatData(data);awaitsendThatData(data);});

Async Job (usingdone):

agenda.define('some long running job',(job,done)=>{doSomelengthyTask(data=>{formatThatData(data);sendThatData(data);done();});});

Sync Job:

agenda.define('say hello',job=>{console.log('Hello!');});

define() acts like an assignment: ifdefine(jobName, ...) is called multiple times (e.g. every time your script starts), the definition in the last call will overwrite the previous one. Thus, if youdefine thejobName only once in your code, it's safe for that call to execute multiple times.

Creating Jobs

every(interval, name, [data], [options])

Runs jobname at the giveninterval. Optionally, data and options can be passed in.Every creates a job of typesingle, which means that it will only create onejob in the database, even if that line is run multiple times. This lets you putit in a file that may get run multiple times, such aswebserver.js which mayreboot from time to time.

interval can be a human-readable formatString, acron formatString, or aNumber.

data is an optional argument that will be passed to the processing functionunderjob.attrs.data.

options is an optional argument that will be passed tojob.repeatEvery.In order to use this argument,data must also be specified.

Returns thejob.

agenda.define('printAnalyticsReport',asyncjob=>{constusers=awaitUser.doSomethingReallyIntensive();processUserData(users);console.log('I print a report!');});agenda.every('15 minutes','printAnalyticsReport');

Optionally,name could be array of job names, which is convenient for schedulingdifferent jobs for sameinterval.

agenda.every('15 minutes',['printAnalyticsReport','sendNotifications','updateUserRecords']);

In this case,every returns array ofjobs.

schedule(when, name, [data])

Schedules a job to runname once at a given time.when can be aDate or aString such astomorrow at 5pm.

data is an optional argument that will be passed to the processing functionunderjob.attrs.data.

Returns thejob.

agenda.schedule('tomorrow at noon','printAnalyticsReport',{userCount:100});

Optionally,name could be array of job names, similar to theevery method.

agenda.schedule('tomorrow at noon',['printAnalyticsReport','sendNotifications','updateUserRecords']);

In this case,schedule returns array ofjobs.

now(name, [data])

Schedules a job to runname once immediately.

data is an optional argument that will be passed to the processing functionunderjob.attrs.data.

Returns thejob.

agenda.now('do the hokey pokey');

create(jobName, data)

Returns an instance of ajobName withdata. This doesNOT save the job inthe database. See below to learn how to manually work with jobs.

constjob=agenda.create('printAnalyticsReport',{userCount:100});awaitjob.save();console.log('Job successfully saved');

Managing Jobs

jobs(mongodb-native query, mongodb-native sort, mongodb-native limit, mongodb-native skip)

Lets you query (then sort, limit and skip the result) all of the jobs in the agenda job's database. These are fullmongodb-nativefind,sort,limit andskip commands. See mongodb-native's documentation for details.

constjobs=awaitagenda.jobs({name:'printAnalyticsReport'},{data:-1},3,1);// Work with jobs (see below)

cancel(mongodb-native query)

Cancels any jobs matching the passed mongodb-native query, and removes them from the database. Returns a Promise resolving to the number of cancelled jobs, or rejecting on error.

constnumRemoved=awaitagenda.cancel({name:'printAnalyticsReport'});

This functionality can also be achieved by first retrieving all the jobs from the database usingagenda.jobs(), looping through the resulting array and callingjob.remove() on each. It is however preferable to useagenda.cancel() for this use case, as this ensures the operation is atomic.

disable(mongodb-native query)

Disables any jobs matching the passed mongodb-native query, preventing any matching jobs from being run by the Job Processor.

constnumDisabled=awaitagenda.disable({name:'pollExternalService'});

Similar toagenda.cancel(), this functionality can be acheived with a combination ofagenda.jobs() andjob.disable()

enable(mongodb-native query)

Enables any jobs matching the passed mongodb-native query, allowing any matching jobs to be run by the Job Processor.

constnumEnabled=awaitagenda.enable({name:'pollExternalService'});

Similar toagenda.cancel(), this functionality can be acheived with a combination ofagenda.jobs() andjob.enable()

purge()

Removes all jobs in the database without defined behaviors. Useful if you change a definition name and want to remove old jobs. Returns a Promise resolving to the number of removed jobs, or rejecting on error.

IMPORTANT: Do not run this before you finish defining all of your jobs. If you do, you will nuke your database of jobs.

constnumRemoved=awaitagenda.purge();

Starting the job processor

To get agenda to start processing jobs from the database you must start it. Thiswill schedule an interval (based onprocessEvery) to check for new jobs andrun them. You can also stop the queue.

start

Starts the job queue processing, checkingprocessEvery time to see if thereare new jobs. Must be calledafterprocessEvery, andbefore any job scheduling (e.g.every).

stop

Stops the job queue processing. Unlocks currently running jobs.

This can be very useful for graceful shutdowns so that currently running/grabbed jobs are abandoned so that otherjob queues can grab them / they are unlocked should the job queue start again. Here is an example of how to do a gracefulshutdown.

asyncfunctiongraceful(){awaitagenda.stop();process.exit(0);}process.on('SIGTERM',graceful);process.on('SIGINT',graceful);

Multiple job processors

Sometimes you may want to have multiple node instances / machines process fromthe same queue. Agenda supports a locking mechanism to ensure that multiplequeues don't process the same job.

You can configure the locking mechanism by specifyinglockLifetime as aninterval when defining the job.

agenda.define('someJob',(job,cb)=>{// Do something in 10 seconds or less...},{lockLifetime:10000});

This will ensure that no other job processor (this one included) attempts to run the job againfor the next 10 seconds. If you have a particularly long running job, you will want tospecify a longer lockLifetime.

By default it is 10 minutes. Typically you shouldn't have a job that runs for 10 minutes,so this is really insurance should the job queue crash before the job is unlocked.

When a job is finished (i.e. the returned promise resolves/rejects ordone isspecified in the signature anddone() is called), it will automatically unlock.

Manually working with a job

A job instance has many instance methods. All mutating methods must be followedwith a call toawait job.save() in order to persist the changes to the database.

repeatEvery(interval, [options])

Specifies aninterval on which the job should repeat. The job runs at the time of defining as well in configured intervals, that is "runnow and in intervals".

interval can be a human-readable formatString, acron formatString, or aNumber.

options is an optional argument containing:

options.timezone: should be a string as accepted bymoment-timezone and is considered when using an interval in the cron string format.

options.skipImmediate:true |false (default) Setting thistrue will skip the immediate run. The first run will occur only in configured interval.

options.startDate:Date the first time the job runs, should be equal or after the start date.

options.endDate:Date the job should not repeat after the endDate. The job can run on the end-date itself, but not after that.

options.skipDays:human readable string ('2 days'). After each run, it will skip the duration of 'skipDays'

job.repeatEvery('10 minutes');awaitjob.save();
job.repeatEvery('3 minutes',{skipImmediate:true});awaitjob.save();
job.repeatEvery('0 6 * * *',{timezone:'America/New_York'});awaitjob.save();

repeatAt(time)

Specifies atime when the job should repeat.Possible values

job.repeatAt('3:30pm');awaitjob.save();

schedule(time)

Specifies the nexttime at which the job should run.

job.schedule('tomorrow at 6pm');awaitjob.save();

priority(priority)

Specifies thepriority weighting of the job. Can be a number or a string fromthe above priority table.

job.priority('low');awaitjob.save();

setShouldSaveResult(setShouldSaveResult)

Specifies whether the result of the job should also be stored in the database. Defaults to false.

job.setShouldSaveResult(true);awaitjob.save();

The data returned by the job will be available on theresult attribute after it succeeded and got retrieved again from the database, e.g. viaagenda.jobs(...) or through thesuccess job event).

unique(properties, [options])

Ensure that only one instance of this job exists with the specified properties

options is an optional argument which can overwrite the defaults. It can takethe following:

  • insertOnly:boolean will prevent any properties from persisting if the job already exists. Defaults to false.
job.unique({'data.type':'active','data.userId':'123',nextRunAt:date});awaitjob.save();

IMPORTANT: To avoid high CPU usage by MongoDB, make sure to create an index on the used fields, likedata.type anddata.userId for the example above.

fail(reason)

Setsjob.attrs.failedAt tonow, and setsjob.attrs.failReason toreason.

Optionally,reason can be an error, in which casejob.attrs.failReason willbe set toerror.message

job.fail('insufficient disk space');// orjob.fail(newError('insufficient disk space'));awaitjob.save();

run(callback)

Runs the givenjob and callscallback(err, job) upon completion. Normallyyou never need to call this manually.

job.run((err,job)=>{console.log("I don't know why you would need to do this...");});

save()

Saves thejob.attrs into the database. Returns a Promise resolving to a Job instance, or rejecting on error.

try{awaitjob.save();cosole.log('Successfully saved job to collection');}catch(e){console.error('Error saving job to collection');}

remove()

Removes thejob from the database. Returns a Promise resolving to the number of jobs removed, or rejecting on error.

try{awaitjob.remove();console.log('Successfully removed job from collection');}catch(e){console.error('Error removing job from collection');}

disable()

Disables thejob. Upcoming runs won't execute.

enable()

Enables thejob if it got disabled before. Upcoming runs will execute.

touch()

Resets the lock on the job. Useful to indicate that the job hasn't timed outwhen you have very long running jobs. The call returns a promise that resolveswhen the job's lock has been renewed.

agenda.define('super long job',asyncjob=>{awaitdoSomeLongTask();awaitjob.touch();awaitdoAnotherLongTask();awaitjob.touch();awaitfinishOurLongTasks();});

Job Queue Events

An instance of an agenda will emit the following events:

  • start - called just before a job starts
  • start:job name - called just before the specified job starts
agenda.on('start',job=>{console.log('Job %s starting',job.attrs.name);});
  • complete - called when a job finishes, regardless of if it succeeds or fails
  • complete:job name - called when a job finishes, regardless of if it succeeds or fails
agenda.on('complete',job=>{console.log(`Job${job.attrs.name} finished`);});
  • success - called when a job finishes successfully
  • success:job name - called when a job finishes successfully
agenda.on('success:send email',job=>{console.log(`Sent Email Successfully to${job.attrs.data.to}`);});
  • fail - called when a job throws an error
  • fail:job name - called when a job throws an error
agenda.on('fail:send email',(err,job)=>{console.log(`Job failed with error:${err.message}`);});

Frequently Asked Questions

What is the order in which jobs run?

Jobs are run with priority in a first in first out order (so they will be run in the order they were scheduled AND with respect to highest priority).

For example, if we have two jobs named "send-email" queued (both with the same priority), and the first job is queued at 3:00 PM and second job is queued at 3:05 PM with the samepriority value, then the first job will run first if we start to send "send-email" jobs at 3:10 PM. However if the first job has a priority of5 and the second job has a priority of10, then the second will run first (priority takes precedence) at 3:10 PM.

The defaultMongoDB sort object is{ nextRunAt: 1, priority: -1 } and can be changed through the optionsort when configuring Agenda.

What is the difference betweenlockLimit andmaxConcurrency?

Agenda will lock jobs 1 by one, setting thelockedAt property in mongoDB, and creating an instance of theJob class which it caches into the_lockedJobs array. This defaults to having no limit, but can be managed using lockLimit. If all jobs will need to be run before agenda's next interval (set viaagenda.processEvery), then agenda will attempt to lock all jobs.

Agenda will also pull jobs from_lockedJobs and into_runningJobs. These jobs are actively being worked on by user code, and this is limited bymaxConcurrency (defaults to 20).

If you have multiple instances of agenda processing the same job definition with a fast repeat time you may find they get unevenly loaded. This is because they will compete to lock as many jobs as possible, even if they don't have enough concurrency to process them. This can be resolved by tweaking themaxConcurrency andlockLimit properties.

Sample Project Structure?

Agenda doesn't have a preferred project structure and leaves it to the user tochoose how they would like to use it. That being said, you can check out theexample project structure below.

Can I Donate?

Thanks! I'm flattered, but it's really not necessary. If you really want to, you can find mygittip here.

Web Interface?

Agenda itself does not have a web interface built in but we do offer stand-alone web interfaceAgendash:

Agendash interface

Mongo vs Redis

The decision to use Mongo instead of Redis is intentional. Redis is often used fornon-essential data (such as sessions) and without configuration doesn'tguarantee the same level of persistence as Mongo (should the server need to berestarted/crash).

Agenda decides to focus on persistence without requiring special configurationof Redis (thereby degrading the performance of the Redis server on non-criticaldata, such as sessions).

Ultimately if enough people want a Redis driver instead of Mongo, I will writeone. (Please open an issue requesting it). For now, Agenda decided to focus onguaranteed persistence.

Spawning / forking processes

Ultimately Agenda can work from a single job queue across multiple machines, node processes, or forks. If you are interested in having more than one worker,Bars3s has written up a fantastic example of how one might do it:

constcluster=require('cluster');constos=require('os');consthttpServer=require('./app/http-server');constjobWorker=require('./app/job-worker');constjobWorkers=[];constwebWorkers=[];if(cluster.isMaster){constcpuCount=os.cpus().length;// Create a worker for each CPUfor(leti=0;i<cpuCount;i+=1){addJobWorker();addWebWorker();}cluster.on('exit',(worker,code,signal)=>{if(jobWorkers.indexOf(worker.id)!==-1){console.log(`job worker${worker.process.pid} exited (signal:${signal}). Trying to respawn...`);removeJobWorker(worker.id);addJobWorker();}if(webWorkers.indexOf(worker.id)!==-1){console.log(`http worker${worker.process.pid} exited (signal:${signal}). Trying to respawn...`);removeWebWorker(worker.id);addWebWorker();}});}else{if(process.env.web){console.log(`start http server:${cluster.worker.id}`);// Initialize the http server herehttpServer.start();}if(process.env.job){console.log(`start job server:${cluster.worker.id}`);// Initialize the Agenda herejobWorker.start();}}functionaddWebWorker(){webWorkers.push(cluster.fork({web:1}).id);}functionaddJobWorker(){jobWorkers.push(cluster.fork({job:1}).id);}functionremoveWebWorker(id){webWorkers.splice(webWorkers.indexOf(id),1);}functionremoveJobWorker(id){jobWorkers.splice(jobWorkers.indexOf(id),1);}

Recovering lost Mongo connections ("auto_reconnect")

Agenda is configured by default to automatically reconnect indefinitely, emitting anerror eventwhen no connection is available on eachprocess tick, allowing you to restore the Mongoinstance without having to restart the application.

However, if you are using anexisting Mongo clientyou'll need to configure thereconnectTries andreconnectIntervalconnection settingsmanually, otherwise you'll find that Agenda will throw an error with the message "MongoDB connection is not recoverable,application restart required" if the connection cannot be recovered within 30 seconds.

Example Project Structure

Agenda will only process jobs that it has definitions for. This allows you toselectively choose which jobs a given agenda will process.

Consider the following project structure, which allows us to share models withthe rest of our code base, and specify which jobs a worker processes, if any atall.

- server.js- worker.jslib/  - agenda.js  controllers/    - user-controller.js  jobs/    - email.js    - video-processing.js    - image-processing.js   models/     - user-model.js     - blog-post.model.js

Sample job processor (eg.jobs/email.js)

letemail=require('some-email-lib'),User=require('../models/user-model.js');module.exports=function(agenda){agenda.define('registration email',asyncjob=>{constuser=awaitUser.get(job.attrs.data.userId);awaitemail(user.email(),'Thanks for registering','Thanks for registering '+user.name());});agenda.define('reset password',asyncjob=>{// Etc});// More email related jobs};

lib/agenda.js

constAgenda=require('agenda');constconnectionOpts={db:{address:'localhost:27017/agenda-test',collection:'agendaJobs'}};constagenda=newAgenda(connectionOpts);constjobTypes=process.env.JOB_TYPES ?process.env.JOB_TYPES.split(',') :[];jobTypes.forEach(type=>{require('./jobs/'+type)(agenda);});if(jobTypes.length){agenda.start();// Returns a promise, which should be handled appropriately}module.exports=agenda;

lib/controllers/user-controller.js

letapp=express(),User=require('../models/user-model'),agenda=require('../worker.js');app.post('/users',(req,res,next)=>{constuser=newUser(req.body);user.save(err=>{if(err){returnnext(err);}agenda.now('registration email',{userId:user.primary()});res.send(201,user.toJson());});});

worker.js

require('./lib/agenda.js');

Now you can do the following in your project:

node server.js

Fire up an instance with noJOB_TYPES, giving you the ability to process jobs,but not wasting resources processing jobs.

JOB_TYPES=email node server.js

Allow your http server to process email jobs.

JOB_TYPES=email node worker.js

Fire up an instance that processes email jobs.

JOB_TYPES=video-processing,image-processing node worker.js

Fire up an instance that processes video-processing/image-processing jobs. Good for a heavy hitting server.

Debugging Issues

If you think you have encountered a bug, please feel free to report it here:

Submit Issue

Please provide us with as much details as possible such as:

  • Agenda version
  • Environment (OSX, Linux, Windows, etc)
  • Small description of what happened
  • Any relevant stack track
  • Agenda logs (see below)

To turn on logging, please set your DEBUG env variable like so:

  • OSX:DEBUG="agenda:*" ts-node src/index.ts
  • Linux:DEBUG="agenda:*" ts-node src/index.ts
  • Windows CMD:set DEBUG=agenda:*
  • Windows PowerShell:$env:DEBUG = "agenda:*"

While not necessary, attaching a text file with this debug information wouldbe extremely useful in debugging certain issues and is encouraged.

Known Issues

"Multiple order-by items are not supported. Please specify a single order-by item."

When running Agenda on Azure cosmosDB, you might run into this issue caused by Agenda's sort query used for finding and locking the next job. To fix this, you can passcustom sort option:sort: { nextRunAt: 1 }

Performance

It is recommended to set this index if you use agendash:

db.agendaJobs.ensureIndex({    "nextRunAt" : -1,    "lastRunAt" : -1,    "lastFinishedAt" : -1}, "agendash2")

If you have one job definition with thousand of instances, you can add this index to improve internal sorting queryfor faster sortings

db.agendaJobs.ensureIndex({    "name" : 1,    "disabled" : 1,    "lockedAt" : 1}, "findAndLockDeadJobs")

Sandboxed Worker - use child processes

It's possible to start jobs in a child process, this helps for example for long running processesto seperate them from the main thread. For example if one process consumes too much memory and gets killed,it will not affect any others.To use this feature, several steps are required.1.) create a childWorker helper.The subrocess has a complete seperate context, so there are no database connections or anything else that can be shared.Therefore you have to ensure that all required connections and initializations are done here too. Furthermoreyou also have to load the correct job definition so that agenda nows what code it must execute. Therefore 3 parametersare passed to the childWorker: name, jobId and path to the job definition.

Example file can look like this:

childWorker.ts

import'reflect-metadata';process.on('message',message=>{if(message==='cancel'){process.exit(2);}else{console.log('got message',message);}});(async()=>{constmongooseConnection=/** connect to database *//** do other required initializations */// get process arguments (name, jobId and path to agenda definition file)const[,,name,jobId,agendaDefinition]=process.argv;// set fancy process titleprocess.title=`${process.title} (sub worker:${name}/${jobId})`;// initialize Agenda in "forkedWorker" modeconstagenda=newAgenda({name:`subworker-${name}`,forkedWorker:true});// connect agenda (but do not start it)awaitagenda.mongo(mongooseConnection.dbasany);if(!name||!jobId){thrownewError(`invalid parameters:${JSON.stringify(process.argv)}`);}// load job definition/** in this case the file is for example ../some/path/definitions.js  with a content like:  export default (agenda: Agenda, definitionOnly = false) => {    agenda.define(      'some job',      async (notification: {        attrs: { data: { dealId: string; orderId: TypeObjectId<IOrder> }};      }) => {        // do something      }    );    if (!definitionOnly) {        // here you can create scheduled jobs or other things    }});  */if(agendaDefinition){constloadDefinition=awaitimport(agendaDefinition);(loadDefinition.default||loadDefinition)(agenda,true);}// run this job nowawaitagenda.runForkedJob(jobId);// disconnect database and exitprocess.exit(0);})().catch(err=>{console.error('err',err);if(process.send){process.send(JSON.stringify(err));}process.exit(1);});

Ensure to only define job definitions during this step, otherwise you create someoverhead (e.g. if you create new jobs inside the defintion files). That's why I callthe defintion file with agenda and a second paramter that is set to true. If thisparameter is true, I do not initialize any jobs (create jobs etc..)

2.) to use this, you have to enable it on a job. Set forkMode to true:

constjob=agenda.create('some job',{meep:1});job.forkMode(true);awaitjob.save();

Acknowledgements

License

The MIT License


[8]ページ先頭

©2009-2025 Movatter.jp