Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5eae385

Browse files
author
Vladimir Ershov
committed
starts and wait so far
1 parenta4f01da commit5eae385

9 files changed

+490
-172
lines changed

‎pgpro_scheduler--2.0.sql

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
CREATESCHEMAIF NOT EXISTS schedule;
44

55
CREATETYPEschedule.job_statusAS ENUM ('working','done','error');
6-
CREATETYPEschedule.at_job_statusAS ENUM ('submitted','working','done','error');
76

8-
CREATETABLEschedule.at_jobs(
7+
CREATETABLEschedule.at_jobs_submitted(
98
idSERIALPRIMARY KEY,
109
nodetext,
1110
nametext,
@@ -16,16 +15,27 @@ CREATE TABLE schedule.at_jobs(
1615
onrollback_statementtext,
1716
executortext,
1817
ownertext,
18+
last_start_availabletimestamp with time zone,
1919
postpone interval,
2020
max_run_timeinterval,
2121
max_instancesinteger default1,
22-
status at_job_status default'submited',
23-
submit_timetimestamp with time zone default now(),
24-
startedtimestamp with time zone,
25-
finishedtimestamp with time zone,
26-
reasontext
22+
submit_timetimestamp with time zone default now()
2723
);
28-
CREATEINDEXat_jobs_status_node_at_idxonschedule.at (status, node, at);
24+
CREATEINDEXat_jobs_submitted_node_at_idxonschedule.at_jobs_submitted (node, at);
25+
26+
CREATETABLEschedule.at_jobs_process(
27+
start_timetimestamp with time zone default now()
28+
) INHERITS (schedule.at_jobs_submitted);
29+
30+
CREATEINDEXat_jobs_process_node_at_idxonschedule.at_jobs_process (node, at);
31+
32+
CREATETABLEschedule.at_jobs_done(
33+
statusboolean,
34+
reasontext,
35+
done_timetimestamp with time zone default now()
36+
) INHERITS (schedule.at_jobs_process);
37+
38+
CREATEINDEXat_jobs_done_node_at_idxonschedule.at_jobs_done (node, at);
2939

3040
CREATETABLEschedule.cron(
3141
idSERIALPRIMARY KEY,

‎src/pgpro_scheduler.c

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ char *scheduler_databases = NULL;
4646
char*scheduler_nodename=NULL;
4747
char*scheduler_transaction_state=NULL;
4848
intscheduler_max_workers=2;
49+
intscheduler_at_max_workers=2;
4950
boolscheduler_service_enabled= false;
5051
char*scheduler_schema=NULL;
5152
/* Custom GUC done */
@@ -109,17 +110,18 @@ int get_integer_from_string(char *s, int start, int len)
109110
returnatoi(buff);
110111
}
111112

112-
char*make_date_from_timestamp(TimestampTzts)
113+
char*make_date_from_timestamp(TimestampTzts,boolhires)
113114
{
114115
structpg_tmdt;
115-
char*str=worker_alloc(sizeof(char)*17);
116+
char*str=worker_alloc(sizeof(char)*19);
116117
inttz;
117118
fsec_tfsec;
118119
constchar*tzn;
119120

120121
timestamp2tm(ts,&tz,&dt,&fsec,&tzn,NULL );
121-
sprintf(str,"%04d-%02d-%02d %02d:%02d",dt.tm_year ,dt.tm_mon,
122-
dt.tm_mday,dt.tm_hour,dt.tm_min);
122+
sprintf(str,"%04d-%02d-%02d %02d:%02d:%02d",dt.tm_year ,dt.tm_mon,
123+
dt.tm_mday,dt.tm_hour,dt.tm_min,dt.tm_sec);
124+
if(!hires)str[16]=0;
123125
returnstr;
124126
}
125127

@@ -457,7 +459,7 @@ void _PG_init(void)
457459
);
458460
DefineCustomIntVariable(
459461
"schedule.max_workers",
460-
"How much workers can servescheduler on one database",
462+
"How much workers can servescheduled jobs on one database",
461463
NULL,
462464
&scheduler_max_workers,
463465
2,
@@ -469,6 +471,20 @@ void _PG_init(void)
469471
NULL,
470472
NULL
471473
);
474+
DefineCustomIntVariable(
475+
"schedule.at_max_workers",
476+
"How much workers can serve at jobs on one database",
477+
NULL,
478+
&scheduler_at_max_workers,
479+
2,
480+
1,
481+
100,
482+
PGC_SUSET,
483+
0,
484+
NULL,
485+
NULL,
486+
NULL
487+
);
472488
DefineCustomBoolVariable(
473489
"schedule.enabled",
474490
"Enable schedule service",

‎src/pgpro_scheduler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pid_t registerManagerWorker(schd_manager_t *man);
3535

3636
voidreload_db_role_config(char*dbname);
3737
TimestampTztimestamp_add_seconds(TimestampTzto,intadd);
38-
char*make_date_from_timestamp(TimestampTzts);
38+
char*make_date_from_timestamp(TimestampTzts,boolhires);
3939
intget_integer_from_string(char*s,intstart,intlen);
4040
TimestampTzget_timestamp_from_string(char*str);
4141
TimestampTz_round_timestamp_to_minute(TimestampTzts);

‎src/scheduler_executor.c

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -389,72 +389,30 @@ void set_pg_var(bool result, executor_error_t *ee)
389389

390390
job_t*initializeExecutorJob(schd_executor_share_t*data)
391391
{
392-
constchar*sql="select at.last_start_available, cron.same_transaction, cron.do_sql, cron.executor, cron.postpone, cron.max_run_time as time_limit, cron.max_instances, cron.onrollback_statement , cron.next_time_statement from schedule.at at, schedule.cron cron where start_at = $1 and at.active and at.cron = cron.id AND cron.node = $2 AND cron.id = $3";
393-
Oidargtypes[3]= {TIMESTAMPTZOID,TEXTOID,INT4OID};
394-
Datumargs[3];
395392
job_t*J;
396-
intret;
397393
char*error=NULL;
398-
char*ts;
399-
400-
args[0]=TimestampTzGetDatum(data->start_at);
401-
args[1]=PointerGetDatum(cstring_to_text(data->nodename));
402-
args[2]=Int32GetDatum(data->cron_id);
403394

404-
START_SPI_SNAP();
405-
ret=execute_spi_sql_with_args(sql,3,argtypes,args,NULL,&error);
395+
J=data->type==CronJob ?
396+
get_cron_job(data->cron_id,data->start_at,data->nodename,&error):
397+
get_at_job(data->cron_id,data->nodename,&error);
406398

407399
if(error)
408400
{
409-
snprintf(data->message,
410-
PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
411-
"cannot retrive job: %s",error);
401+
snprintf(data->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
402+
"%s",error);
412403
elog(LOG,"EXECUTOR: %s",data->message);
413404
pfree(error);
414-
PopActiveSnapshot();
415-
AbortCurrentTransaction();
416-
SPI_finish();
417405
returnNULL;
418406
}
419-
420-
if(ret==SPI_OK_SELECT)
407+
if(!J)
421408
{
422-
if(SPI_processed==0)
423-
{
424-
STOP_SPI_SNAP();
425-
ts=make_date_from_timestamp(data->start_at);
426-
snprintf(data->message,
427-
PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
428-
"cannot find job: %d @ %s [%s]",
429-
data->cron_id,ts,data->nodename);
430-
elog(LOG,"EXECUTOR: %s",data->message);
431-
pfree(ts);
432-
returnNULL;
433-
}
434-
J=worker_alloc(sizeof(job_t));
435-
436-
J->cron_id=data->cron_id;
437-
J->start_at=data->start_at;
438-
J->node=_copy_string(data->nodename);
439-
J->same_transaction=get_boolean_from_spi(0,2, false);
440-
J->dosql=get_textarray_from_spi(0,3,&J->dosql_n);
441-
J->executor=get_text_from_spi(0,4);
442-
J->onrollback=get_text_from_spi(0,8);
443-
J->next_time_statement=get_text_from_spi(0,9);
444-
445-
STOP_SPI_SNAP();
446-
447-
returnJ;
409+
snprintf(data->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
410+
"unknown error get job");
411+
elog(LOG,"EXECUTOR: %s",data->message);
412+
returnNULL;
448413
}
449-
snprintf(data->message,
450-
PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
451-
"error while retrive job information: %d",ret);
452-
elog(LOG,"EXECUTOR: %s",data->message);
453-
454-
PopActiveSnapshot();
455-
AbortCurrentTransaction();
456-
SPI_finish();
457-
returnNULL;
414+
415+
returnJ;
458416
}
459417

460418
intpush_executor_error(executor_error_t*e,char*fmt, ...)

‎src/scheduler_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct {
2020
charuser[NAMEDATALEN];
2121

2222
intcron_id;
23+
task_type_ttype;
2324
TimestampTzstart_at;
2425

2526
schd_executor_status_tstatus;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp