You signed in with another tab or window.Reload to refresh your session.You signed out in another tab or window.Reload to refresh your session.You switched accounts on another tab or window.Reload to refresh your session.Dismiss alert
pgpro_scheduler это планировщик задач для СУБД PostgreSQL, который позволяетпланировать исполнение задач в базе и контроллировать их исполнение.
Задачи это наборы SQL команд. Расписание выполнения задач задается либо строкойcron, либо указанием конкретных дат запуска, либо JSON объектом, в которомуказывается в какие дни часы и минуты задача должна быть запущена. Возможнакомбинация методов описания расписания.
Каждая задача имеет возможность для вычисления времени следующего своегозапуска. Набор SQL команд в задаче может обрабатываться в разных транзакциях,по транзакции на команду, или в одной. В последнем случае имеется возможностьзадания SQL конманды, которая будет выполняться в случае аварийного завершениятранзакции.
Installation
pgpro_scheduler это расширение PostgreSQL и не тербует никаких специальныхпререквизитов.
Перед сборкой расширения из исходного кода убедитесь, что переменнаяокружения PATH содержит путь к командеpg_config. Так же убедитесь,что у вас установлена версия PostgresSQL для разработчиков или PostgreSQLсобран из исходного кода.
Процедура установки выглядит следующим образом:
$ cd pgpro_scheduler$ make USE_PGXS=1$ sudo make USE_PGXS=1 install$ psql <DBNAME> -c "CREATE EXTNESION pgpro_scheduler"
Конфигурация
Расширение определяет ряд переменных в PostgreSQL (GUC), которые позволяютуправлять его конфигурацией.
schedule.enable - двоичная переменная, которая поределяет разрешено ливыполнение расширения. По умолчанию: false.
schedule.database - строковая переменная, указывает с какими базам можетработать расширение. По умолчанию - пустая строка.
schedule.nodename - строковая переменная, содержит название узла.По умолчанию - master. Если расширение используется на одной машине,то переменная не имеет смысла.
schedule.max_workers - целочисленная переменная, содержит максимальноеколичество одновременно работающих задач для одной базы. По умолчанию - 2.
schedule.transaction_state - строковая переменная, устанавливаетсярасширением в процессе работы. По умолчанию - undefined. Переменнаяиспользуется для определения статуса завершения транзакции при вычисленииследующего времени выполнения задачи. Возможные значения:
success - транзакция завершилась успешно
failure - транзакция завершилась аварийно
running - транзакция в процессе исполнения
undefined - транзакция не началась
Последние два значения не должны попадать в процедуру определения следующегозначения. Это будет означать какую-то внутреннюю ошибку в работе расширения.
Управление
Управление работой планировщика задач осуществляется через переменныеPostgreSQL, которые описаны в предыдущем разделе.
Например, у вас существует свежая инсталляция PostgreSQL с установленнымрасширением планировщика. И вам требуется запустить планировщик на двухбазах database1 и database2. При этом вы хотите что бы планировщик длябазы database1 мог исполнять 5 задач одновременно, а для базы database2 - 3.
В $DATADIR/postgresql.conf должна присутствовать строка:
shared_preload_libraries = 'pgpro_scheduler'
Далее в psql введите следующие команды:
ALTER SYSTEM SET schedule.enable = true;ALTER SYSTEM SET schedule.database = 'database1,database2';ALTER DATABASE database1 SET schedule.max_workers = 5;ALTER DATABASE database2 SET schedule.max_workers = 3;SELECT pg_reload_conf();
Если вам не нужны указания различных значений для разных баз данных, то все этоможно занести в конфигурационный файл PostgreSQL и перечитать конфигурацию.Перезапуска не требуется.
Пример записей в$DATADIR/postgresql.conf, если количество одновременноисполняемых задач в обоих базах одинаково:
Планировщик задач работает с помощью Background Worker'ов. Поэтому должно бытьправильно установленно значение переменнойmax_worker_processes. Минимальноезначение переменной может быть расчитано по следующей формуле:
Nmin - это минимальное значение переменной, котороетребуется для работы конфигурации. Имейте в виду, что Background Workes'ымогут требоваться для работы других систем, например, параллельных запросов.
Ndatabases - это количество баз данных, для которыхзапускается планировщик.
MAX_WORKERSn - это значение переменнойschedule.max_workersв контексте каждой базы данных, для которой запусткается планировщик.
SQL Schema
The extention creates aschedule schema. All functions, types and tables of extensionare defined within this scheme. Direct access to the tables created is forbiddento public. All actions should be done by means of sql interface functions.
SQL Types
Extension defines two SQL types and uses them as types of return valuesin interface functions.
CREATE TYPE schedule.cron_rec AS(id integer, -- job record idnode text, -- name of node name text, -- name of the jobcomments text, -- comment on jobrule jsonb, -- rule of schedulecommands text[], -- sql commands to executerun_as text, -- name of the executor userowner text, -- name of the owner userstart_date timestamp, -- left bound of execution time window -- unbound if NULLend_date timestamp, -- right bound of execution time window-- unbound if NULLuse_same_transaction boolean, -- if true sequence of command executes -- in a single transactionlast_start_available interval, -- time interval while command could -- be executed if it's impossible -- to start it at scheduled timemax_instances int,-- the number of instances run at the same timemax_run_time interval, -- time interval - max execution time when -- elapsed - sequence of queries will be abortedonrollback text, -- statement to be executed on ROLLBACKnext_time_statement text, -- statement to be executed to calculate -- next execution timeactive boolean, -- is job executes at that momentbroken boolean -- if job is broken);CREATE TYPE schedule.cron_job AS(cron integer, -- job record idnode text, -- name of node scheduled_at timestamp, -- scheduled job timename text, -- job namecomments text, -- job commentscommands text[], -- sql commands to executerun_as text, -- name of the executor userowner text, -- name of the owner useruse_same_transaction boolean,-- if true sequence of command executes-- in a single transactionstarted timestamp, -- time when job startedlast_start_available timestamp,-- time untill job must be startedfinished timestamp, -- time when job finishedmax_run_time interval, -- max execution timemax_instances int,-- the number of instances run at the same timeonrollback text, -- statement on ROLLBACKnext_time_statement text,-- statement to calculate next start timestatus text, -- status of job: working, done, error message text -- error message if one);
SQL Interface Functions
schedule.create_job(text, text, text)
This function creates a job and sets it active.
Arguments:
crontab string
sql to execute
node name, optional
Returns ID of created job.
schedule.create_job(text, text[], text)
This function creates a job and sets it active.
Arguments:
crontab string
set of sqls to execute
node name, optional
Returns ID of created job.
schedule.create_job(timestamp, text, text)
This function creates a job and sets it active.
Arguments:
execution time
sql to execute
node name, optional
Returns ID of created job.
schedule.create_job(timestamp, text[], text)
This function creates a job and sets it active.
Arguments:
execution time
set of sqls to execute
node name, optional
Returns ID of created job.
schedule.create_job(timestamp[], text, text)
This function creates a job and sets it active.
Arguments:
set of execution times
sql to execute
node name, optional
Returns ID of created job.
schedule.create_job(timestamp[], text[], text)
This function creates a job and sets it active.
Arguments:
set of execution times
set of sqls to execute
node name, optional
Returns ID of created job.
schedule.create_job(jsonb)
This function creates a job and sets it active.
As input parameter it accepts jsonb object which describes job to be created.It returns an integer - ID of created job.
Jsonb object can contains following keys:
name - job name;
node - node name, default 'master';
comments - some comments on job;
cron - cron string rule;
rule - jsonb job schedule;
command - sql command to execute;
commands - sql commands to execute, text[];
run_as - user to execute command(s);
start_date - begin of period while command could be executed, could be NULL;
end_date - end of period while command could be executed, could be NULL;
date - Exact date when command will be executed;
dates - Set of exact dates when comman will be executed;
use_same_transaction - if set of commans should be executed withinthe same transaction;
last_start_available - for how long could command execution bepostponed in format of interval type;
max_run_time - how long task could be executed, NULL - infinite;
onrollback - sql statement to be executed after rollback, if one occured;
next_time_statement - sql statement to be executed last to calc nextexecution time'.
The schedule of job could be set as crontab format string (keycron). Alsoit could be set as a jsonb object (keyrule).
The object may contain following keys:
minutes - an array with values in range 0-59 - minutes
hours - an array with values in range 0-23 - hours
days - an array with value in range 1-31 - month days
months - an array with value in range 1-12 - months
wdays - an array with value in range 0-6 - week days, 0 - Sunday
onstart - 1 or 0, by default 0, if set to 1 - run only once at start time
Also schedule can be set as exact time or times to execute on. Use keysdate for single time anddates as array for sequence of execution times.
SQL commands to be executed can be set overcommand orcommands keys. Thesecond one must be an array.
next_time_statement executs after sql commands finished. It must returntimestamp which will be considered as next execution time. Ifpg_variablesextension is installed this statment can access information about ifexecution of main commands was successful. Information is stored invariabletransaction of packagepgpro_scheduler.
processing - commands are being proccessed. Actually ifnext_time_statement receive this state that means something goes wrong insidescheduler
schedule.set_job_attributes(integer, jsonb)
The function allows to edit settings of existing job.
First argument is ID of existing job, second argument is jsonb objectdescribed in functionschedule.create_job. Some of keys may be omitted.
Function returns boolean value - true on success and false on failure.
User can edit only jobs it owns unless it's a superuser.
schedule.set_job_attribute(integer, text, text || anyarray)
The function allows to set exact property of existing job.
First argument is ID of existing job, second is name of the property - oneof the keys described in functionschedule.create_job.
Function returns boolean value - true on success and false on failure.
schedule.deactivate_job(integer)
The function allows to set cron job inactive. The job is not to be deletedfrom cron table but its execution will be disabled.
The first argument is ID of existing job.
schedule.activate_job(integer)
The function allows to set cron job active.
The first argument is ID of existing job.
schedule.drop_job(jobId integer)
The function deletes cron job.
The first argument is ID of existing job.
schedule.get_job(int)
The function returns information about exact cron record.
It returns record of typecron_rec. See description above.
schedule.get_user_owned_cron(text)
The function returns list of the jobs in cron table owned by userpassed in first argument or by session user if no user passed.
It returns set of records ofcron_rec type. See description above.
schedule.get_user_cron(text)
The function returns list of the jobs in cron table which will be executedas user passed in first argument or by session user if no user passed.
It returns set of records ofcron_rec type. See description above.
schedule.get_user_active_jobs(text)
The function returns all jobs executed at the moment as user passed in firstargument. If no user specified - session user used.
It returns set of records ofcron_job type. See description above.
schedule.get_active_jobs()
The function returns all jobs executed at the moment. Can be executed onlyby superuser.
It returns set of records ofcron_job type. See description above.
schedule.get_log()
The function returns all jobs which was executed. Can be executed onlyby superuser.
It returns set of records ofcron_job type. See description above.
schedule.get_user_log(text)
The function returns all jobs which was executed as user passed in firstargument. If no user specified - session user used.
It returns set of records ofcron_job type. See description above.
schedule.clean_log()
The function deletes all records in log table. Can be executed only bysuperuser.
Returns number of records deleted.
Internals
Extention creates 3 tables in schemaschedule. They are not accessableby public.
schedule.cron - table contains records to be scheduled to porces job.Analog for crontab.
CREATE TABLE schedule.cron(id SERIAL PRIMARY KEY,name text,-- name of jobnode text,-- name of nodecomments text,-- comments on jobrule jsonb,-- json object with shedule, see description belownext_time_statement text,-- sql statement to be executed to -- calculate next execution timedo_sql text[],-- SQL statements to be executedsame_transaction boolean DEFAULT false,-- if sequence in do_sql will be-- executed in one transactiononrollback_statement text,-- sql statement to be executed after ROLLBACKactive boolean DEFAULT true,-- is job activebroken boolean DEFAULT false,-- is job broken executor text,-- name of executor userowner text,-- neme of user who owns (created) jobpostpone interval,-- on what time execution could be delayed if there-- are no free session to execute it in timeretry integer default 0,-- number of retrys if errormax_run_time interval,-- how long job can be processedmax_instances integer default 1,-- how much instances of the same-- job could be executed simultaneouslystart_date timestamp,-- begin of time period within job can-- be performed, can be NULLend_date timestamp,-- end of time period within job can-- be performed, can be NULLreason text-- text reason why job marked as broken);
schedule.at - table stores nearest jobs to be executed orbeing executed at the moment. Each record contains information abouttime the job must begin, reference to cron table, time of last start allowed(if specified), time of actual start (if being performed), state - waitingexecution or executing.
CREATE TABLE schedule.at(start_at timestamp,-- time job will startlast_start_available timestamp,-- time last start allowedretry integer,cron integer REFERENCES schedule.cron (id), -- cron table referencenode text,started timestamp,-- time of actual startactive boolean-- true - execution, false - waiting);
scedule.log - table with job executed. When job has been performedit moved fromschedule.at to this table, so tables has about the samestructure except this table has information about result of execution.
CREATE TABLE schedule.log(start_at timestamp,-- time at job were to be startedlast_start_available timestamp,-- time of last start availableretry integer,cron integer,-- reference to cron tablenode text,-- reference to cron table nodestarted timestamp,-- time job has been startedfinished timestamp,-- time job has been finishedstatus boolean,-- true - success, false - failuremessage text-- error message);