Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A lightweight library that adds job scheduling capabilities to RQ (Redis Queue)

License

NotificationsYou must be signed in to change notification settings

rq/rq-scheduler

Repository files navigation

RQ Scheduler

RQ Scheduler is a small package thatadds job scheduling capabilities toRQ,aRedis based Python queuing library.

https://travis-ci.org/rq/rq-scheduler.svg?branch=master

Support RQ Scheduler

If you findrq-scheduler useful, please consider supporting its development viaTidelift.

Requirements

Installation

You can installRQ Scheduler via pip:

pip install rq-scheduler

Or you can download the latest stable package fromPyPI.

Usage

Schedule a job involves doing two different things:

  1. Putting a job in the scheduler
  2. Running a scheduler that will move scheduled jobs into queues when the time comes

Scheduling a Job

There are two ways you can schedule a job. The first is using RQ Scheduler'senqueue_at

fromredisimportRedisfromrqimportQueuefromrq_schedulerimportSchedulerfromdatetimeimportdatetimescheduler=Scheduler(connection=Redis())# Get a scheduler for the "default" queuescheduler=Scheduler('foo',connection=Redis())# Get a scheduler for the "foo" queue# You can also instantiate a Scheduler using an RQ Queuequeue=Queue('bar',connection=Redis())scheduler=Scheduler(queue=queue,connection=queue.connection)# Puts a job into the scheduler. The API is similar to RQ except that it# takes a datetime object as first argument. So for example to schedule a# job to run on Jan 1st 2020 we do:scheduler.enqueue_at(datetime(2020,1,1),func)# Date time should be in UTC# Here's another example scheduling a job to run at a specific date and time (in UTC),# complete with args and kwargs.scheduler.enqueue_at(datetime(2020,1,1,3,4),func,foo,bar=baz)# You can choose the queue type where jobs will be enqueued by passing the name of the type to the scheduler# used to enqueuescheduler=Scheduler('foo',queue_class="rq.Queue")scheduler.enqueue_at(datetime(2020,1,1),func)# The job will be enqueued at the queue named "foo" using the queue type "rq.Queue"

The second way is usingenqueue_in. Instead of taking adatetime object,this method expects atimedelta and schedules the job to run atX seconds/minutes/hours/days/weeks later. For example, if we want to monitor howpopular a tweet is a few times during the course of the day, we could do something like

fromdatetimeimporttimedelta# Schedule a job to run 10 minutes, 1 hour and 1 day laterscheduler.enqueue_in(timedelta(minutes=10),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(hours=1),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(days=1),count_retweets,tweet_id)

IMPORTANT: You should always use UTC datetime when working withRQ Scheduler.

Periodic & Repeated Jobs

As of version 0.3,RQ Scheduler also supports creating periodic and repeated jobs.You can do this via theschedule method. Note that this feature needsRQ >= 0.3.1.

This is how you do it

scheduler.schedule(scheduled_time=datetime.utcnow(),# Time for first execution, in UTC timezonefunc=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedinterval=60,# Time before the function is called again, in secondsrepeat=10,# Repeat this number of times (None means repeat forever)meta={'foo':'bar'}# Arbitrary pickleable data on the job itself)

IMPORTANT NOTE: If you set up a repeated job, you must make sure that youeither do not set a result_ttl value or you set a value larger than the interval.Otherwise, the entry with the job details will expire and the job will not get re-scheduled.

Cron Jobs

As of version 0.6.0,RQ Scheduler also supports creating Cron Jobs, which you can use forrepeated jobs to run periodically at fixed times, dates or intervals, for more info checkhttps://en.wikipedia.org/wiki/Cron. You can do this via thecron method.

This is how you do it

scheduler.cron(cron_string,# A cron string (e.g. "0 0 * * 0")func=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedrepeat=10,# Repeat this number of times (None means repeat forever)result_ttl=300,# Specify how long (in seconds) successful jobs and their results are kept. Defaults to -1 (forever)ttl=200,# Specifies the maximum queued time (in seconds) before it's discarded. Defaults to None (infinite TTL).queue_name=queue_name,# In which queue the job should be put inmeta={'foo':'bar'},# Arbitrary pickleable data on the job itselfuse_local_timezone=False# Interpret hours in the local timezone)

Retrieving scheduled jobs

Sometimes you need to know which jobs have already been scheduled. You can get alist of enqueued jobs with theget_jobs method

list_of_job_instances=scheduler.get_jobs()

In it's simplest form (as seen in the above example) this method returns a listof all job instances that are currently scheduled for execution.

Additionally the method takes two optional keyword argumentsuntil andwith_times. The first one specifies up to which point in time scheduled jobsshould be returned. It can be given as either a datetime / timedelta instanceor an integer denoting the number of seconds since epoch (1970-01-01 00:00:00).The second argument is a boolean that determines whether the scheduled executiontime should be returned along with the job instances.

Example

# get all jobs until 2012-11-30 10:00:00list_of_job_instances=scheduler.get_jobs(until=datetime(2012,10,30,10))# get all jobs for the next hourlist_of_job_instances=scheduler.get_jobs(until=timedelta(hours=1))# get all jobs with execution timesjobs_and_times=scheduler.get_jobs(with_times=True)# returns a list of tuples:# [(<rq.job.Job object at 0x123456789>, datetime.datetime(2012, 11, 25, 12, 30)), ...]

Checking if a job is scheduled

You can check whether a specific job instance or job id is scheduled forexecution using the familiar pythonin operator

ifjob_instanceinscheduler:# Do something# orifjob_idinscheduler:# Do something

Canceling a job

To cancel a job, simply pass aJob or a job id toscheduler.cancel

scheduler.cancel(job)

Note that this method returnsNone whether the specified job was found or not.

Running the scheduler

RQ Scheduler comes with a scriptrqscheduler that runs a schedulerprocess that polls Redis once every minute and move scheduled jobs to therelevant queues when they need to be executed

# This runs a scheduler process using the default Redis connectionrqscheduler

If you want to use a different Redis server you could also do

rqscheduler --host localhost --port 6379 --db 0

The script accepts these arguments:

  • -H or--host: Redis server to connect to
  • -p or--port: port to connect to
  • -d or--db: Redis db to use
  • -P or--password: password to connect to Redis
  • -b or--burst: runs in burst mode (enqueue scheduled jobs whose execution time is in the past and quit)
  • -i INTERVAL or--interval INTERVAL: How often the scheduler checks for new jobs to add to the queue (in seconds, can be floating-point for more precision).
  • -j or--job-class: specify custom job class for rq to use (python module.Class)
  • -q or--queue-class: specify custom queue class for rq to use (python module.Class)

The arguments pull default values from environment variables with thesame names but with a prefix ofRQ_REDIS_.

Running the Scheduler as a Service on Ubuntu

sudo /etc/systemd/system/rqscheduler.service

[Unit]Description=RQSchedulerAfter=network.target[Service]ExecStart=/home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/bin/python\    /home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/lib/<<YourPythonVersion>>/site-packages/rq_scheduler/scripts/rqscheduler.py[Install]WantedBy=multi-user.target

You will also want to add any command line parameters if your configuration is not localhost or not set in the environment variables.

Start, check Status and Enable the service

sudo systemctl start rqscheduler.servicesudo systemctl status rqscheduler.servicesudo systemctlenable rqscheduler.service

Running Multiple Schedulers

Multiple instances of the rq-scheduler can be run simultaneously. It allows for

  • Reliability (no single point of failure)
  • Failover (scheduler instances automatically retry to attain lock and schedule jobs)
  • Running scheduler on multiple server instances to make deployment identical and easier

Multiple schedulers can be run in any way you want. Typically you'll only want to run one scheduler per server/instance.

rqscheduler -i 5# another shell/systemd service or ideally another serverrqscheduler -i 5# different parameters can be provided to different schedulersrqscheduler -i 10

Practical example:

  • scheduler_a is running onec2_instance_a
  • Ifscheduler_a crashes orec2_instance_a goes down, then our tasks won't be scheduled at all
  • Instead we can simply run 2 schedulers. Another scheduler calledscheduler_b can be run onec2_instance_b
  • Now bothscheduler_a andscheduler_b will periodically check and schedule the jobs
  • If one fails, the other still works

You can read more about multiple schedulers in#212 and#195


[8]ページ先頭

©2009-2025 Movatter.jp