Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc24a589

Browse files
author
Vladimir Ershov
committed
fix vanish process
1 parent359854c commitc24a589

File tree

4 files changed

+29
-10
lines changed

4 files changed

+29
-10
lines changed

‎src/scheduler_job.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ job_t *get_at_job_for_process(MemoryContext mem, char *nodename, char **error)
171171
inti;
172172
char*oldpath;
173173

174-
constchar*get_job_sql="select * from at_jobs_submitted s where ((not exists ( select * from at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED";
174+
/* const char *get_job_sql = "select * from at_jobs_submitted s where ((not exists ( select * from at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED"; */
175+
constchar*get_job_sql="select * from schedule.at_jobs_submitted s where ((not exists ( select * from schedule.at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from schedule.at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL AND not exists ( select * from schedule.at_jobs_done d where d.id = any(s.depends_on) and d.status=false)) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED ";
175176
constchar*insert_sql="insert into at_jobs_process values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)";
176177
spi_response_t*r;
177178
spi_response_t*r2;

‎src/scheduler_manager.c

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include"utils/builtins.h"
2626
#include"utils/timestamp.h"
2727
#include<sys/time.h>
28+
#include"utils/lsyscache.h"
29+
#include"catalog/namespace.h"
2830

2931
#include"char_array.h"
3032
#include"sched_manager_poll.h"
@@ -1376,7 +1378,7 @@ int update_cron_texttime(scheduler_manager_ctx_t *ctx, int cron_id, TimestampTz
13761378
returnret;
13771379
}
13781380

1379-
intscheduler_vanish_expired_jobs(scheduler_manager_ctx_t*ctx,task_type_ttype)
1381+
intscheduler_vanish_expired_jobs(scheduler_manager_ctx_t*ctx,task_type_ttype,Oidat_reloid)
13801382
{
13811383
job_t*expired;
13821384
intnexpired=0;
@@ -1396,10 +1398,14 @@ int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type
13961398
pgstat_report_activity(STATE_RUNNING,"vanish expired tasks");
13971399

13981400
START_SPI_SNAP();
1401+
if(type==AtJob)
1402+
{
1403+
ts_hires= true;
1404+
scheduler_atjob_id_OID=get_atttype(at_reloid,1);
1405+
}
13991406
expired=type==CronJob ?
14001407
get_expired_cron_jobs(ctx->nodename,&nexpired,&is_error):
14011408
get_expired_at_jobs(ctx->nodename,&nexpired,&is_error);
1402-
if(type==AtJob)ts_hires= true;
14031409

14041410
if(is_error)
14051411
{
@@ -1760,6 +1766,7 @@ void manager_worker_main(Datum arg)
17601766
schd_manager_share_t*parent_shared;
17611767
MemoryContextold=NULL;
17621768
MemoryContextlongTerm;
1769+
Oidreloid;
17631770

17641771

17651772
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
@@ -1814,6 +1821,17 @@ void manager_worker_main(Datum arg)
18141821
longTerm=init_mem_ctx("long term context for slots");
18151822
ctx=initialize_scheduler_manager_context(longTerm,database,seg);
18161823

1824+
START_SPI_SNAP();
1825+
reloid=RangeVarGetRelid(makeRangeVarFromNameList(
1826+
stringToQualifiedNameList("schedule.at_jobs_submitted")),NoLock, true);
1827+
STOP_SPI_SNAP();
1828+
if(reloid==InvalidOid)
1829+
{
1830+
ereport(ERROR,
1831+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1832+
errmsg("scheduler manager cannot find jobs table")));
1833+
}
1834+
18171835
start_at_workers(ctx,shared);
18181836
clean_at_table(ctx);
18191837
set_slots_stat_report(ctx);
@@ -1850,8 +1868,8 @@ void manager_worker_main(Datum arg)
18501868
set_slots_stat_report(ctx);
18511869
/* if there are any expired jobs to get rid of */
18521870

1853-
scheduler_vanish_expired_jobs(ctx,AtJob);
1854-
scheduler_vanish_expired_jobs(ctx,CronJob);
1871+
scheduler_vanish_expired_jobs(ctx,AtJob,reloid);
1872+
scheduler_vanish_expired_jobs(ctx,CronJob,reloid);
18551873
}
18561874
}
18571875

‎src/scheduler_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ void fill_cron_array_from_rule(Jsonb *J, const char *name, bit_array_t *ce, int
104104
boolis_cron_fit_timestamp(bit_array_t*cron,TimestampTztimestamp);
105105
char**get_dates_array_from_rule(scheduler_task_t*task,int*num);
106106
intget_integer_from_jsonbval(JsonbValue*ai,intdef);
107-
intscheduler_vanish_expired_jobs(scheduler_manager_ctx_t*ctx,task_type_ttype);
107+
intscheduler_vanish_expired_jobs(scheduler_manager_ctx_t*ctx,task_type_ttype,Oidat_reloid);
108108
inthow_many_instances_on_work(scheduler_manager_ctx_t*ctx,job_t*job);
109109
intinsert_at_record(char*nodename,intcron_id,TimestampTzstart_at,TimestampTzpostpone,char**error);
110110
intset_job_on_free_slot(scheduler_manager_ctx_t*ctx,job_t*job);

‎test/perl/runtest.pl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@
6565
"ALTER SYSTEM SET schedule.enabled = on",
6666
"SELECT pg_reload_conf();",
6767
"CREATE TABLE test_results( time_mark timestamp, commentary text )",
68-
"DROP ROLE IF EXISTS_pgpro_tester",
69-
"CREATE ROLE_pgpro_tester",
70-
"GRANT INSERT ON test_results TO_pgpro_tester",
68+
"DROP ROLE IF EXISTStester",
69+
"CREATE ROLEtester",
70+
"GRANT INSERT ON test_results TOtester",
7171
);
7272
map { __do_sql($dbh,$_) }@sql2;
7373
$dbh->disconnect();
@@ -87,7 +87,7 @@
8787
my$harness = TAP::Harness->new( \%args );
8888
my@tests =glob('t/*.t' );
8989
### @tests = ('t/jobMaxRunTime.t');
90-
$harness->runtests(@tests);
90+
$harness->runtests(@tests);
9191

9292

9393
sub__do_sql

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp