Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc795670

Browse files
author
Vladimir Ershov
committed
resubmit && resubmit_limit
1 parent672905b commitc795670

11 files changed

+118
-13
lines changed

‎pgpro_scheduler--2.0.sql

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ CREATE TABLE at_jobs_submitted(
1717
executortext,
1818
ownertext,
1919
last_start_availabletimestamp with time zone,
20+
attemptbigint default0,
21+
resubmit_limitbigint default100,
2022
postpone interval,
2123
max_run_timeinterval,
2224
submit_timetimestamp with time zone default now()
@@ -153,8 +155,8 @@ CREATE FUNCTION get_self_id()
153155
AS'MODULE_PATHNAME','get_self_id'
154156
LANGUAGE C IMMUTABLE;
155157

156-
CREATEFUNCTIONresubmit()
157-
RETURNSboolean
158+
CREATEFUNCTIONresubmit(run_after interval defaultNULL)
159+
RETURNSbigint
158160
AS'MODULE_PATHNAME','resubmit'
159161
LANGUAGE C IMMUTABLE;
160162

@@ -168,7 +170,8 @@ CREATE FUNCTION submit_job(
168170
run_astext defaultNULL,
169171
depends_onbigint[] defaultNULL,
170172
nametext defaultNULL,
171-
commentstext defaultNULL
173+
commentstext defaultNULL,
174+
resubmit_limitbigint default100
172175
) RETURNSbigintAS
173176
$BODY$
174177
DECLARE
@@ -215,10 +218,12 @@ BEGIN
215218

216219
INSERT INTO at_jobs_submitted
217220
(node, at, do_sql, owner, executor, name, comments, max_run_time,
218-
postpone, last_start_available, depends_on, params)
221+
postpone, last_start_available, depends_on, params,
222+
attempt, resubmit_limit)
219223
VALUES
220224
(node, run_after, query,session_user, executor, name, comments,
221-
max_duration, max_wait_interval, last_avail, depends_on, params)
225+
max_duration, max_wait_interval, last_avail, depends_on, params,
226+
0, resubmit_limit)
222227
RETURNING id INTO job_id;
223228

224229
RETURN job_id;

‎src/pgpro_scheduler.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void reload_db_role_config(char *dbname)
9292
CommitTransactionCommand();
9393
}
9494

95-
TimestampTztimestamp_add_seconds(TimestampTzto,intadd)
95+
TimestampTztimestamp_add_seconds(TimestampTzto,int64add)
9696
{
9797
if(to==0)to=GetCurrentTimestamp();
9898
#ifdefHAVE_INT64_TIMESTAMP

‎src/pgpro_scheduler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ int checkSchedulerNamespace(void);
3434
pid_tregisterManagerWorker(schd_manager_t*man);
3535

3636
voidreload_db_role_config(char*dbname);
37-
TimestampTztimestamp_add_seconds(TimestampTzto,intadd);
37+
TimestampTztimestamp_add_seconds(TimestampTzto,int64add);
3838
char*make_date_from_timestamp(TimestampTzts,boolhires);
3939
intget_integer_from_string(char*s,intstart,intlen);
4040
TimestampTzget_timestamp_from_string(char*str);

‎src/scheduler_executor.c

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ extern volatile sig_atomic_t got_sighup;
3636
externvolatilesig_atomic_tgot_sigterm;
3737

3838
staticint64current_job_id=-1;
39-
staticintresubmit_current_job=0;
39+
staticint64resubmit_current_job=0;
4040

4141
staticvoidhandle_sigterm(SIGNAL_ARGS);
4242

@@ -193,7 +193,24 @@ void executor_worker_main(Datum arg)
193193
{
194194
STOP_SPI_SNAP();
195195
}
196-
status=SchdExecutorDone;
196+
if(job->type==AtJob&&resubmit_current_job>0)
197+
{
198+
if(job->attempt >=job->resubmit_limit)
199+
{
200+
status=SchdExecutorError;
201+
push_executor_error(&EE,"Cannot resubmit: limit reached (%ld)",job->resubmit_limit);
202+
resubmit_current_job=0;
203+
}
204+
else
205+
{
206+
status=SchdExecutorResubmit;
207+
}
208+
}
209+
else
210+
{
211+
status=SchdExecutorDone;
212+
}
213+
197214
SetConfigOption("schedule.transaction_state","success",PGC_INTERNAL,PGC_S_SESSION);
198215
}
199216
if(job->next_time_statement)
@@ -218,6 +235,11 @@ void executor_worker_main(Datum arg)
218235
set_shared_message(shared,&EE);
219236
}
220237
shared->status=status;
238+
if(status==SchdExecutorResubmit)
239+
{
240+
shared->next_time=timestamp_add_seconds(0,resubmit_current_job);
241+
resubmit_current_job=0;
242+
}
221243

222244
delete_worker_mem_ctx();
223245
dsm_detach(seg);
@@ -481,9 +503,28 @@ PG_FUNCTION_INFO_V1(resubmit);
481503
Datum
482504
resubmit(PG_FUNCTION_ARGS)
483505
{
506+
Interval*interval;
507+
484508
if(current_job_id==-1)
485509
{
486510
elog(ERROR,"There is no active job in progress");
487511
}
488-
PG_RETURN_BOOL(true);
512+
if(PG_ARGISNULL(0))
513+
{
514+
resubmit_current_job=1;
515+
PG_RETURN_INT64(1);
516+
}
517+
interval=PG_GETARG_INTERVAL_P(0);
518+
#ifdefHAVE_INT64_TIMESTAMP
519+
resubmit_current_job=interval->time /1000000.0;
520+
#else
521+
resubmit_current_job=interval->time;
522+
#endif
523+
resubmit_current_job+=
524+
(DAYS_PER_YEAR*SECS_PER_DAY)* (interval->month /MONTHS_PER_YEAR);
525+
resubmit_current_job+=
526+
(DAYS_PER_MONTH*SECS_PER_DAY)* (interval->month %MONTHS_PER_YEAR);
527+
resubmit_current_job+=SECS_PER_DAY*interval->day;
528+
529+
PG_RETURN_INT64(resubmit_current_job);
489530
}

‎src/scheduler_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ typedef enum {
1111
SchdExecutorInit,
1212
SchdExecutorWork,
1313
SchdExecutorDone,
14+
SchdExecutorResubmit,
1415
SchdExecutorError
1516
}schd_executor_status_t;
1617

‎src/scheduler_job.c

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *init_scheduler_job(job_t *j, unsigned char type)
2525
job_t*get_at_job(intcron_id,char*nodename,char**perror)
2626
{
2727
job_t*j;
28-
constchar*sql="select last_start_available, array_append('{}'::text[], do_sql)::text[], executor, postpone, max_run_time as time_limit, at, params, depends_on from ONLY at_jobs_process where node = $1 and id = $2";
28+
constchar*sql="select last_start_available, array_append('{}'::text[], do_sql)::text[], executor, postpone, max_run_time as time_limit, at, params, depends_on, attempt, resubmit_limit from ONLY at_jobs_process where node = $1 and id = $2";
2929
Oidargtypes[2]= {TEXTOID,INT4OID};
3030
Datumargs[2];
3131
intret;
@@ -69,6 +69,8 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror)
6969
j->start_at=get_timestamp_from_spi(0,6,0);
7070
j->sql_params=get_textarray_from_spi(0,7,&j->sql_params_n);
7171
j->depends_on=get_int64array_from_spi(0,8,&j->depends_on_n);
72+
j->attempt=get_int64_from_spi(0,9,0);
73+
j->resubmit_limit=get_int64_from_spi(0,10,0);
7274

7375
STOP_SPI_SNAP();
7476

@@ -333,7 +335,7 @@ int _at_move_job_to_log(job_t *j, bool status, bool process)
333335
{
334336
Datumvalues[3];
335337
charnulls[3]= {' ',' ',' ' };
336-
Oidargtypes[4]= {INT4OID,BOOLOID,TEXTOID };
338+
Oidargtypes[3]= {INT4OID,BOOLOID,TEXTOID };
337339
intret;
338340
constchar*sql_process="WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
339341
constchar*sql_submitted="WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, NULL as start_time, $2 as status, $3 as reason FROM moved_rows";
@@ -356,6 +358,20 @@ int _at_move_job_to_log(job_t *j, bool status, bool process)
356358
returnret>0 ?1:ret;
357359
}
358360

361+
intresubmit_at_job(job_t*j,TimestampTznext)
362+
{
363+
Datumvalues[2];
364+
Oidargtypes[2]= {INT4OID,TIMESTAMPTZOID };
365+
intret;
366+
constchar*sql="WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_submitted SELECT id, node, name, comments, $2, do_sql, params, depends_on, executor, owner, last_start_available, attempt +1 , resubmit_limit, postpone, max_run_time, submit_time FROM moved_rows";
367+
368+
values[0]=Int32GetDatum(j->cron_id);
369+
values[1]=TimestampTzGetDatum(next);
370+
ret=SPI_execute_with_args(sql,2,argtypes,values,NULL, false,0);
371+
372+
returnret>0 ?1:ret;
373+
}
374+
359375
int_cron_move_job_to_log(job_t*j,boolstatus)
360376
{
361377
Datumvalues[4];

‎src/scheduler_job.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ typedef struct {
3636
char*onrollback;
3737
char*next_time_statement;
3838
boolis_active;
39+
int64attempt;
40+
int64resubmit_limit;
3941
char*error;
4042
}job_t;
4143

@@ -52,6 +54,7 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror);
5254
job_t*get_cron_job(intcron_id,TimestampTzstart_at,char*nodename,char**perror);
5355
int_cron_move_job_to_log(job_t*j,boolstatus);
5456
int_at_move_job_to_log(job_t*j,boolstatus,boolprocessed);
57+
intresubmit_at_job(job_t*j,TimestampTznext);
5558

5659
#endif
5760

‎src/scheduler_manager.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,12 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
972972
toremove[nremove].reason=shm_data->status==SchdExecutorDone ?RmDone:RmError;
973973
nremove++;
974974
}
975+
elseif(shm_data->status==SchdExecutorResubmit)
976+
{
977+
toremove[nremove].pos=i;
978+
toremove[nremove].reason=RmDoneResubmit;
979+
nremove++;
980+
}
975981
}
976982
}
977983
if(nremove)
@@ -996,6 +1002,11 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
9961002
item->wait_worker_to_die= true;
9971003
}
9981004
}
1005+
elseif(toremove[i].reason==RmDoneResubmit)
1006+
{
1007+
removeJob= true;
1008+
job_status= true;
1009+
}
9991010
elseif(toremove[i].reason==RmWaitWorker)/* wait worker to die */
10001011
{
10011012
if(GetBackgroundWorkerPid(item->handler,&tmppid)==BGWH_STARTED)
@@ -1066,7 +1077,22 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10661077
}
10671078
}
10681079
}
1069-
move_job_to_log(item->job,job_status, true);
1080+
if(toremove[i].reason==RmDoneResubmit)
1081+
{
1082+
if(item->job->type==AtJob)
1083+
{
1084+
resubmit_at_job(item->job,shm_data->next_time);
1085+
}
1086+
else
1087+
{
1088+
set_job_error(item->job,"cannot resubmit Cron job");
1089+
move_job_to_log(item->job, false, true);
1090+
}
1091+
}
1092+
else
1093+
{
1094+
move_job_to_log(item->job,job_status, true);
1095+
}
10701096
STOP_SPI_SNAP();
10711097

10721098
last=p->len-p->free-1;

‎src/scheduler_manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ typedef enum {
2828
RmTimeout,
2929
RmWaitWorker,
3030
RmError,
31+
RmDoneResubmit,
3132
RmDone
3233
}schd_remove_reason_t;
3334

‎src/scheduler_spi_utils.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ int get_int_from_spi(int row_n, int pos, int def)
230230
return (int)DatumGetInt32(datum);
231231
}
232232

233+
int64get_int64_from_spi(introw_n,intpos,intdef)
234+
{
235+
Datumdatum;
236+
boolis_null;
237+
238+
datum=SPI_getbinval(SPI_tuptable->vals[row_n],SPI_tuptable->tupdesc,
239+
pos,&is_null);
240+
if(is_null)returndef;
241+
return (int64)DatumGetInt64(datum);
242+
}
243+
233244
Datumselect_onedatumvalue_sql(constchar*sql,bool*is_null)
234245
{
235246
intret;

‎src/scheduler_spi_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ void ABORT_SPI_SNAP(void);
1818
char*_copy_string(char*str);
1919
TimestampTzget_timestamp_from_spi(introw_n,intpos,TimestampTzdef);
2020
intget_int_from_spi(introw_n,intpos,intdef);
21+
int64get_int64_from_spi(introw_n,intpos,intdef);
2122
intselect_oneintvalue_sql(constchar*sql,intd);
2223
Datumselect_onedatumvalue_sql(constchar*sql,bool*is_null);
2324
intselect_count_with_args(constchar*sql,intn,Oid*argtypes,Datum*values,char*nulls);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp