Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit73569c8

Browse files
author
Vladimir Ershov
committed
next time
new custom varbug fixes
1 parent1340976 commit73569c8

File tree

6 files changed

+188
-96
lines changed

6 files changed

+188
-96
lines changed

‎pgpro_scheduler--1.0.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ CREATE TABLE schedule.cron(
2424
max_instancesinteger default1,
2525
start_datetimestamp with time zone,
2626
end_datetimestamp with time zone,
27-
reasontext
27+
reasontext,
28+
_next_exec_timetimestamp with time zone
2829
);
2930

3031
CREATETABLEschedule.at(

‎src/pgpro_scheduler.c

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ volatile sig_atomic_t got_sigterm = false;
3939

4040
staticchar*sched_databases="";
4141
staticchar*sched_nodename="master";
42+
staticchar*sched_transaction_state="undefined";
4243
staticintsched_max_workers=2;
4344

4445
externvoid
@@ -91,7 +92,7 @@ char *make_date_from_timestamp(TimestampTz ts)
9192
fsec_tfsec;
9293
constchar*tzn;
9394

94-
timestamp2tm(ts,&tz,&dt,&fsec,&tzn,NULL );/* TODO ERROR */
95+
timestamp2tm(ts,&tz,&dt,&fsec,&tzn,NULL );
9596
sprintf(str,"%04d-%02d-%02d %02d:%02d",dt.tm_year ,dt.tm_mon,
9697
dt.tm_mday,dt.tm_hour,dt.tm_min);
9798
returnstr;
@@ -180,13 +181,11 @@ char_array_t *readBasesToCheck(void)
180181
pfree(clean_value);
181182
returnresult;
182183
}
183-
elog(LOG,"clean value: %s [%d,%d]",clean_value,cv_len,nnames);
184184
names=makeCharArray();
185185
for(i=0;i<cv_len+1;i++)
186186
{
187187
if(clean_value[i]==0)
188188
{
189-
elog(LOG,"start position: %d",start_pos);
190189
ptr=clean_value+start_pos;
191190
if(strlen(ptr))pushCharArray(names,ptr);
192191
start_pos=i+1;
@@ -259,7 +258,6 @@ void parent_scheduler_main(Datum arg)
259258

260259
BackgroundWorkerInitializeConnection("postgres",NULL);
261260
names=readBasesToCheck();
262-
elog(LOG,"GOT NAMES");
263261
poll=initSchedulerManagerPool(names);
264262
destroyCharArray(names);
265263

@@ -374,6 +372,18 @@ void _PG_init(void)
374372
NULL,
375373
NULL
376374
);
375+
DefineCustomStringVariable(
376+
"schedule.transaction_state",
377+
"State of scheduler executor transaction",
378+
"If not under scheduler executor process the variable has no mean and has a value = 'undefined', possible values: progress, success, failure",
379+
&sched_transaction_state ,
380+
"undefined",
381+
PGC_INTERNAL,
382+
0,
383+
NULL,
384+
NULL,
385+
NULL
386+
);
377387
DefineCustomIntVariable(
378388
"schedule.max_workers",
379389
"How much workers can serve scheduler on one database",

‎src/scheduler_executor.c

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,12 @@ void executor_worker_main(Datum arg)
5858
dsm_segment*seg;
5959
job_t*job;
6060
inti;
61-
boolsuccess= true;
6261
executor_error_tEE;
6362
intret;
6463
char*error=NULL;
65-
booluse_pg_vars= true;
64+
/* bool use_pg_vars = true; */
65+
/* bool success = true; */
6666
schd_executor_status_tstatus;
67-
/* int rc = 0; */
6867

6968
EE.n=0;
7069
EE.errors=NULL;
@@ -127,6 +126,7 @@ void executor_worker_main(Datum arg)
127126
CHECK_FOR_INTERRUPTS();
128127
/* rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 0);
129128
ResetLatch(MyLatch); */
129+
SetConfigOption("schedule.transaction_state","running",PGC_INTERNAL,PGC_S_SESSION);
130130

131131
if(job->same_transaction)
132132
{
@@ -143,8 +143,9 @@ void executor_worker_main(Datum arg)
143143
ret=execute_spi(job->dosql[i],&error);
144144
if(ret<0)
145145
{
146-
success= false;
146+
/*success = false; */
147147
status=SchdExecutorError;
148+
SetConfigOption("schedule.transaction_state","failure",PGC_INTERNAL,PGC_S_SESSION);
148149
if(error)
149150
{
150151
push_executor_error(&EE,"error on %d: %s",i+1,error);
@@ -174,13 +175,15 @@ void executor_worker_main(Datum arg)
174175
STOP_SPI_SNAP();
175176
}
176177
status=SchdExecutorDone;
178+
SetConfigOption("schedule.transaction_state","success",PGC_INTERNAL,PGC_S_SESSION);
177179
}
178180
if(job->next_time_statement)
179181
{
180-
if(use_pg_vars)/* may be to define custom var is better */
182+
/*if(use_pg_vars)
181183
{
182184
set_pg_var(success, &EE);
183185
}
186+
*/
184187
shared->next_time=get_next_excution_time(job->next_time_statement,&EE);
185188
}
186189
pgstat_report_activity(STATE_RUNNING,"finish job processing");
@@ -261,7 +264,7 @@ void set_shared_message(schd_executor_share_t *shared, executor_error_t *ee)
261264
}
262265
else
263266
{
264-
memcpy(ptr,", ",2);
267+
memcpy(ptr,"; ",2);
265268
left-=2;
266269
ptr+=2;
267270
}
@@ -277,7 +280,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
277280
boolisnull;
278281

279282
START_SPI_SNAP();
280-
pgstat_report_activity(STATE_RUNNING,"finish job processing");
283+
pgstat_report_activity(STATE_RUNNING,"culc next time execution time");
281284
ret=execute_spi(sql,&error);
282285
if(ret<0)
283286
{
@@ -333,7 +336,6 @@ int executor_onrollback(job_t *job, executor_error_t *ee)
333336
{
334337
if(error)
335338
{
336-
elog(LOG,"EXECUTOR: onrollback error: %s",error);
337339
push_executor_error(ee,"onrollback error: %s",error);
338340
pfree(error);
339341
}
@@ -371,7 +373,7 @@ void set_pg_var(bool result, executor_error_t *ee)
371373
}
372374
else
373375
{
374-
push_executor_error(ee,"set variable: code: %d",ret);
376+
push_executor_error(ee,"set variable error code: %d",ret);
375377
}
376378
}
377379
}

‎src/scheduler_job.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
2525
intret,got,i;
2626
Oidargtypes[1]= {TEXTOID };
2727
Datumvalues[1];
28-
constchar*get_job_sql="select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor from schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
28+
constchar*get_job_sql="select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor, cron.next_time_statement from schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
2929

3030
*is_error=*n=0;
3131
START_SPI_SNAP();
@@ -48,6 +48,7 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
4848
jobs[i].max_instances=get_int_from_spi(i,5,1);
4949
jobs[i].node=_copy_string(nodename);
5050
jobs[i].executor=get_text_from_spi(i,6);
51+
jobs[i].next_time_statement=get_text_from_spi(i,7);
5152
}
5253
}
5354
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp