- Notifications
You must be signed in to change notification settings - Fork230
A lightweight library that adds job scheduling capabilities to RQ (Redis Queue)
License
rq/rq-scheduler
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
RQ Scheduler is a small package thatadds job scheduling capabilities toRQ,aRedis based Python queuing library.
If you findrq-scheduler
useful, please consider supporting its development viaTidelift.
You can installRQ Scheduler via pip:
pip install rq-scheduler
Or you can download the latest stable package fromPyPI.
Schedule a job involves doing two different things:
- Putting a job in the scheduler
- Running a scheduler that will move scheduled jobs into queues when the time comes
There are two ways you can schedule a job. The first is using RQ Scheduler'senqueue_at
fromredisimportRedisfromrqimportQueuefromrq_schedulerimportSchedulerfromdatetimeimportdatetimescheduler=Scheduler(connection=Redis())# Get a scheduler for the "default" queuescheduler=Scheduler('foo',connection=Redis())# Get a scheduler for the "foo" queue# You can also instantiate a Scheduler using an RQ Queuequeue=Queue('bar',connection=Redis())scheduler=Scheduler(queue=queue,connection=queue.connection)# Puts a job into the scheduler. The API is similar to RQ except that it# takes a datetime object as first argument. So for example to schedule a# job to run on Jan 1st 2020 we do:scheduler.enqueue_at(datetime(2020,1,1),func)# Date time should be in UTC# Here's another example scheduling a job to run at a specific date and time (in UTC),# complete with args and kwargs.scheduler.enqueue_at(datetime(2020,1,1,3,4),func,foo,bar=baz)# You can choose the queue type where jobs will be enqueued by passing the name of the type to the scheduler# used to enqueuescheduler=Scheduler('foo',queue_class="rq.Queue")scheduler.enqueue_at(datetime(2020,1,1),func)# The job will be enqueued at the queue named "foo" using the queue type "rq.Queue"
The second way is usingenqueue_in
. Instead of taking adatetime
object,this method expects atimedelta
and schedules the job to run atX seconds/minutes/hours/days/weeks later. For example, if we want to monitor howpopular a tweet is a few times during the course of the day, we could do something like
fromdatetimeimporttimedelta# Schedule a job to run 10 minutes, 1 hour and 1 day laterscheduler.enqueue_in(timedelta(minutes=10),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(hours=1),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(days=1),count_retweets,tweet_id)
IMPORTANT: You should always use UTC datetime when working withRQ Scheduler.
As of version 0.3,RQ Scheduler also supports creating periodic and repeated jobs.You can do this via theschedule
method. Note that this feature needsRQ >= 0.3.1.
This is how you do it
scheduler.schedule(scheduled_time=datetime.utcnow(),# Time for first execution, in UTC timezonefunc=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedinterval=60,# Time before the function is called again, in secondsrepeat=10,# Repeat this number of times (None means repeat forever)meta={'foo':'bar'}# Arbitrary pickleable data on the job itself)
IMPORTANT NOTE: If you set up a repeated job, you must make sure that youeither do not set a result_ttl value or you set a value larger than the interval.Otherwise, the entry with the job details will expire and the job will not get re-scheduled.
As of version 0.6.0,RQ Scheduler also supports creating Cron Jobs, which you can use forrepeated jobs to run periodically at fixed times, dates or intervals, for more info checkhttps://en.wikipedia.org/wiki/Cron. You can do this via thecron
method.
This is how you do it
scheduler.cron(cron_string,# A cron string (e.g. "0 0 * * 0")func=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedrepeat=10,# Repeat this number of times (None means repeat forever)result_ttl=300,# Specify how long (in seconds) successful jobs and their results are kept. Defaults to -1 (forever)ttl=200,# Specifies the maximum queued time (in seconds) before it's discarded. Defaults to None (infinite TTL).queue_name=queue_name,# In which queue the job should be put inmeta={'foo':'bar'},# Arbitrary pickleable data on the job itselfuse_local_timezone=False# Interpret hours in the local timezone)
Sometimes you need to know which jobs have already been scheduled. You can get alist of enqueued jobs with theget_jobs
method
list_of_job_instances=scheduler.get_jobs()
In it's simplest form (as seen in the above example) this method returns a listof all job instances that are currently scheduled for execution.
Additionally the method takes two optional keyword argumentsuntil
andwith_times
. The first one specifies up to which point in time scheduled jobsshould be returned. It can be given as either a datetime / timedelta instanceor an integer denoting the number of seconds since epoch (1970-01-01 00:00:00).The second argument is a boolean that determines whether the scheduled executiontime should be returned along with the job instances.
Example
# get all jobs until 2012-11-30 10:00:00list_of_job_instances=scheduler.get_jobs(until=datetime(2012,10,30,10))# get all jobs for the next hourlist_of_job_instances=scheduler.get_jobs(until=timedelta(hours=1))# get all jobs with execution timesjobs_and_times=scheduler.get_jobs(with_times=True)# returns a list of tuples:# [(<rq.job.Job object at 0x123456789>, datetime.datetime(2012, 11, 25, 12, 30)), ...]
You can check whether a specific job instance or job id is scheduled forexecution using the familiar pythonin
operator
ifjob_instanceinscheduler:# Do something# orifjob_idinscheduler:# Do something
To cancel a job, simply pass aJob
or a job id toscheduler.cancel
scheduler.cancel(job)
Note that this method returnsNone
whether the specified job was found or not.
RQ Scheduler comes with a scriptrqscheduler
that runs a schedulerprocess that polls Redis once every minute and move scheduled jobs to therelevant queues when they need to be executed
# This runs a scheduler process using the default Redis connectionrqscheduler
If you want to use a different Redis server you could also do
rqscheduler --host localhost --port 6379 --db 0
The script accepts these arguments:
-H
or--host
: Redis server to connect to-p
or--port
: port to connect to-d
or--db
: Redis db to use-P
or--password
: password to connect to Redis-b
or--burst
: runs in burst mode (enqueue scheduled jobs whose execution time is in the past and quit)-i INTERVAL
or--interval INTERVAL
: How often the scheduler checks for new jobs to add to the queue (in seconds, can be floating-point for more precision).-j
or--job-class
: specify custom job class for rq to use (python module.Class)-q
or--queue-class
: specify custom queue class for rq to use (python module.Class)
The arguments pull default values from environment variables with thesame names but with a prefix ofRQ_REDIS_
.
sudo /etc/systemd/system/rqscheduler.service
[Unit]Description=RQSchedulerAfter=network.target[Service]ExecStart=/home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/bin/python\ /home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/lib/<<YourPythonVersion>>/site-packages/rq_scheduler/scripts/rqscheduler.py[Install]WantedBy=multi-user.target
You will also want to add any command line parameters if your configuration is not localhost or not set in the environment variables.
Start, check Status and Enable the service
sudo systemctl start rqscheduler.servicesudo systemctl status rqscheduler.servicesudo systemctlenable rqscheduler.service
Multiple instances of the rq-scheduler can be run simultaneously. It allows for
- Reliability (no single point of failure)
- Failover (scheduler instances automatically retry to attain lock and schedule jobs)
- Running scheduler on multiple server instances to make deployment identical and easier
Multiple schedulers can be run in any way you want. Typically you'll only want to run one scheduler per server/instance.
rqscheduler -i 5# another shell/systemd service or ideally another serverrqscheduler -i 5# different parameters can be provided to different schedulersrqscheduler -i 10
Practical example:
scheduler_a
is running onec2_instance_a
- If
scheduler_a
crashes orec2_instance_a
goes down, then our tasks won't be scheduled at all - Instead we can simply run 2 schedulers. Another scheduler called
scheduler_b
can be run onec2_instance_b
- Now both
scheduler_a
andscheduler_b
will periodically check and schedule the jobs - If one fails, the other still works
About
A lightweight library that adds job scheduling capabilities to RQ (Redis Queue)