Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit77d2473

Browse files
author
Vladimir Ershov
committed
cancel job added
1 parent0b1a087 commit77d2473

File tree

3 files changed

+44
-6
lines changed

3 files changed

+44
-6
lines changed

‎pgpro_scheduler--2.0.sql

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ CREATE TABLE at_jobs_submitted(
2222
resubmit_limitbigint default100,
2323
postpone interval,
2424
max_run_timeinterval,
25+
canceledboolean default false,
2526
submit_timetimestamp with time zone default now()
2627
);
2728
CREATEINDEXat_jobs_submitted_node_at_idxon at_jobs_submitted (node, at);
@@ -161,6 +162,31 @@ CREATE FUNCTION resubmit(run_after interval default NULL)
161162
AS'MODULE_PATHNAME','resubmit'
162163
LANGUAGE C IMMUTABLE;
163164

165+
CREATEFUNCTIONcancel_job(job_idbigint) RETURNSbooleanAS
166+
$BODY$
167+
DECLARE
168+
s_countint;
169+
BEGIN
170+
EXECUTE'SELECT count(*) FROM at_jobs_submitted WHERE owner = session_user AND id = $1' INTO s_count USING job_id;
171+
IF s_count>0 THEN
172+
UPDATE at_jobs_submittedSET canceled= trueWHERE"id"= job_id;
173+
WITH moved_rowsAS (DELETEfrom ONLY at_jobs_submitted aWHEREa.id= job_id RETURNING a.*)INSERT INTO at_jobs_doneSELECT*,NULLas start_time, falseas status,'job was canceled'as reasonFROM moved_rows;
174+
RETURN true;
175+
ELSE
176+
EXECUTE'SELECT count(*) FROM at_jobs_process WHERE owner = session_user AND id = $1' INTO s_count USING job_id;
177+
IF s_count>0 THEN
178+
UPDATE at_jobs_processSET canceled= trueWHERE"id"= job_id;
179+
RETURN true;
180+
END IF;
181+
END IF;
182+
183+
RETURN false;
184+
END
185+
$BODY$
186+
LANGUAGE plpgsql
187+
SECURITY DEFINERset search_pathFROM CURRENT;
188+
189+
164190
CREATEFUNCTIONsubmit_job(
165191
querytext,
166192
paramstext[] defaultNULL,
@@ -1283,7 +1309,7 @@ CREATE VIEW job_status AS
12831309
id, node, name, comments, atas run_after,
12841310
do_sqlas query, params, depends_on, executoras run_as, attempt,
12851311
resubmit_limit, postponeas max_wait_interval,
1286-
max_run_timeas max_duration, submit_time,
1312+
max_run_timeas max_duration, submit_time, canceled,
12871313
start_time, statusas is_success, reasonas error, done_time,
12881314
'done'::job_at_status_t status
12891315
FROMschedule.at_jobs_donewhere owner=session_user
@@ -1292,7 +1318,7 @@ CREATE VIEW job_status AS
12921318
id, node, name, comments, atas run_after,
12931319
do_sqlas query, params, depends_on, executoras run_as, attempt,
12941320
resubmit_limit, postponeas max_wait_interval,
1295-
max_run_timeas max_duration, submit_time, start_time,
1321+
max_run_timeas max_duration, submit_time,canceled,start_time,
12961322
NULLas is_success,NULLas error,NULLas done_time,
12971323
'processing'::job_at_status_t status
12981324
FROM ONLYschedule.at_jobs_processwhere owner=session_user
@@ -1301,7 +1327,7 @@ CREATE VIEW job_status AS
13011327
id, node, name, comments, atas run_after,
13021328
do_sqlas query, params, depends_on, executoras run_as, attempt,
13031329
resubmit_limit, postponeas max_wait_interval,
1304-
max_run_timeas max_duration, submit_time,
1330+
max_run_timeas max_duration, submit_time,canceled,
13051331
NULLas start_time,NULLas is_success,NULLas error,
13061332
NULLas done_time,
13071333
'submitted'::job_at_status_t status

‎src/scheduler_job.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
167167
Oidargtypes[2]= {TEXTOID,INT4OID };
168168
Datumvalues[2];
169169
/* const char *get_job_sql = "select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted where at <= 'now' and (last_start_available is NULL OR last_start_available > 'now') AND node = $1 order by at, submit_time limit $2"; */
170-
constchar*get_job_sql="select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 order by at, submit_time limit $2";
170+
constchar*get_job_sql="select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1and not canceledorder by at, submit_time limit $2";
171171

172172
*is_error=*n=0;
173173
START_SPI_SNAP();
@@ -366,9 +366,17 @@ int resubmit_at_job(job_t *j, TimestampTz next)
366366
intret;
367367
constchar*sql="WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_submitted SELECT id, node, name, comments, $2, do_sql, params, depends_on, executor, owner, last_start_available, attempt +1 , resubmit_limit, postpone, max_run_time, submit_time FROM moved_rows";
368368

369+
369370
values[0]=Int32GetDatum(j->cron_id);
370371
values[1]=TimestampTzGetDatum(next);
371-
ret=SPI_execute_with_args(sql,2,argtypes,values,NULL, false,0);
372+
if(select_count_with_args("SELECT count(*) FROM at_jobs_process WHERE NOT canceled and id = $1",1,argtypes,values,NULL))
373+
{
374+
ret=SPI_execute_with_args(sql,2,argtypes,values,NULL, false,0);
375+
}
376+
else
377+
{
378+
return-2;
379+
}
372380

373381
returnret>0 ?1:ret;
374382
}

‎src/scheduler_manager.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,11 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10811081
{
10821082
if(item->job->type==AtJob)
10831083
{
1084-
resubmit_at_job(item->job,shm_data->next_time);
1084+
if(resubmit_at_job(item->job,shm_data->next_time)==-2)
1085+
{
1086+
set_job_error(item->job,"was canceled while processing");
1087+
move_job_to_log(item->job, false, true);
1088+
}
10851089
}
10861090
else
10871091
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp