Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb8b4a86

Browse files
author
Vladimir Ershov
committed
add GUC schema var - what scheme in use now
1 parent52e3f73 commitb8b4a86

File tree

4 files changed

+63
-50
lines changed

4 files changed

+63
-50
lines changed

‎README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ pgpro_scheduler это расширение PostgreSQL и не требует н
4141
***schedule.database** - строковая переменная, указывает с какими базам может
4242
работать планировщик. Что бы указать несколько баз, нужно перечислить их
4343
имена через запятую. По умолчанию - пустая строка.
44+
***schedule.scheme** - строковая переменная, указывает в какой`scheme`
45+
находятся служебные таблицы планировщика. Для изменения требуется
46+
перезагрузка. Обычно ее не надо менять. Может использоваться для работы
47+
на реплике, если используется foreign data wrapper. По умолчанию -
48+
schedule.
4449
***schedule.nodename** - строковая переменная, содержит название узла.
4550
По умолчанию - master. Если расширение используется на одной машине,
4651
то переменная не имеет смысла.

‎src/pgpro_scheduler.c

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ volatile sig_atomic_t got_sighup = false;
4040
volatilesig_atomic_tgot_sigterm= false;
4141

4242
/* Custom GUC variables */
43-
staticchar*scheduler_databases="";
44-
staticchar*scheduler_nodename="master";
45-
staticchar*scheduler_transaction_state="undefined";
43+
staticchar*scheduler_databases=NULL;
44+
staticchar*scheduler_nodename=NULL;
45+
staticchar*scheduler_transaction_state=NULL;
4646
staticintscheduler_max_workers=2;
4747
staticboolscheduler_service_enabled= false;
48+
staticchar*scheduler_schema=NULL;
4849
/* Custom GUC done */
4950

5051
externvoid
@@ -398,7 +399,18 @@ pg_scheduler_startup(void)
398399

399400
void_PG_init(void)
400401
{
401-
RequestAddinShmemSpace(1000);
402+
DefineCustomStringVariable(
403+
"schedule.schema",
404+
"The name of scheduler schema",
405+
NULL,
406+
&scheduler_schema,
407+
"schedule",
408+
PGC_POSTMASTER,
409+
0,
410+
NULL,
411+
NULL,
412+
NULL
413+
);
402414
DefineCustomStringVariable(
403415
"schedule.database",
404416
"On which databases scheduler could be run",

‎src/scheduler_job.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
2525
intret,got,i;
2626
Oidargtypes[1]= {TEXTOID };
2727
Datumvalues[1];
28-
constchar*get_job_sql="select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor, cron.next_time_statement fromschedule.at at,schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
28+
constchar*get_job_sql="select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executor, cron.next_time_statement from at at, cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
2929

3030
*is_error=*n=0;
3131
START_SPI_SNAP();
@@ -68,7 +68,7 @@ job_t *get_expired_jobs(char *nodename, int *n, int *is_error)
6868

6969
*n=*is_error=0;
7070
initStringInfo(&sql);
71-
appendStringInfo(&sql,"select start_at, last_start_available, cron, started, active fromschedule.at where last_start_available < 'now' and not active and node = '%s'",nodename);
71+
appendStringInfo(&sql,"select start_at, last_start_available, cron, started, active from at where last_start_available < 'now' and not active and node = '%s'",nodename);
7272
ret=SPI_execute(sql.data, true,0);
7373
if(ret==SPI_OK_SELECT)
7474
{
@@ -115,8 +115,8 @@ int move_job_to_log(job_t *j, bool status)
115115
charnulls[4]= {' ',' ',' ',' ' };
116116
Oidargtypes[4]= {BOOLOID,TEXTOID,INT4OID,TIMESTAMPTZOID };
117117
intret;
118-
constchar*del_sql="delete fromschedule.at where start_at = $1 and cron = $2";
119-
constchar*sql="insert intoschedule.log (start_at, last_start_available, retry, cron, node, started, status, finished, message) SELECT start_at, last_start_available, retry, cron, node, started, $1 as status, 'now'::timestamp as finished, $2 as message fromschedule.at where cron = $3 and start_at = $4";
118+
constchar*del_sql="delete from at where start_at = $1 and cron = $2";
119+
constchar*sql="insert into log (start_at, last_start_available, retry, cron, node, started, status, finished, message) SELECT start_at, last_start_available, retry, cron, node, started, $1 as status, 'now'::timestamp as finished, $2 as message from at where cron = $3 and start_at = $4";
120120

121121
/* in perl was this at first $status = 0 if $job->{spoiled}; skip so far */
122122

‎src/scheduler_manager.c

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -44,55 +44,51 @@ extern volatile sig_atomic_t got_sigterm;
4444

4545
intcheckSchedulerNamespace(void)
4646
{
47-
constchar*sql="select count(*) from pg_namespace where nspname ='schedule'";
48-
intfound=0;
49-
intret;
50-
intntup;
51-
boolisnull;
47+
constchar*sql="select count(*) frompg_catalog.pg_namespace where nspname =$1";
48+
intcount=0;
49+
constchar*schema;
50+
Oidargtypes[1]= {TEXTOID };
51+
Datumvalues[1];
5252

53-
pgstat_report_activity(STATE_RUNNING,"initialize: check namespace");
5453
SetCurrentStatementStartTimestamp();
54+
pgstat_report_activity(STATE_RUNNING,"initialize: check namespace");
55+
56+
schema=GetConfigOption("schedule.schema", false, true);
57+
5558
StartTransactionCommand();
5659
SPI_connect();
5760
PushActiveSnapshot(GetTransactionSnapshot());
5861

59-
ret=SPI_execute(sql, true,0);
60-
if(ret==SPI_OK_SELECT&&SPI_processed==1)
62+
values[0]=CStringGetTextDatum(schema);
63+
count=select_count_with_args(sql,1,argtypes,values,NULL);
64+
65+
if(count==-1)
6166
{
62-
ntup=DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
63-
SPI_tuptable->tupdesc,1,&isnull)
64-
);
65-
if(!isnull&&ntup==1)
66-
{
67-
found=1;
68-
}
69-
elseif(isnull)
70-
{
71-
elog(LOG,"Scheduler manager: %s: cannot check namespace: count return null",
72-
MyBgworkerEntry->bgw_name);
73-
}
74-
elseif(ntup>1)
75-
{
76-
elog(LOG,"Scheduler manager: %s: cannot check namespace: found %d namespaces",
77-
MyBgworkerEntry->bgw_name,ntup);
78-
}
67+
elog(ERROR,"Scheduler manager: %s: cannot check namespace: sql error",
68+
MyBgworkerEntry->bgw_name);
7969
}
80-
elseif(ret!=SPI_OK_SELECT)
70+
elseif(count>1||count==0)
8171
{
82-
elog(LOG,"Scheduler manager: %s: cannot check namespace:error code %d",
83-
MyBgworkerEntry->bgw_name,ret);
72+
elog(LOG,"Scheduler manager: %s: cannot check namespace:found %d namespaces",
73+
MyBgworkerEntry->bgw_name,count);
8474
}
85-
elseif(SPI_processed!=1)
75+
elseif(count==-2)
8676
{
87-
elog(LOG,"Scheduler manager: %s: cannot check namespace: count return %ud tups",
88-
MyBgworkerEntry->bgw_name,
89-
(unsigned)SPI_processed);
77+
elog(LOG,"Scheduler manager: %s: cannot check namespace: count return null",
78+
MyBgworkerEntry->bgw_name);
9079
}
80+
elseif(count!=1)
81+
{
82+
elog(ERROR,"Scheduler manager: %s: cannot check namespace: unknown error %d",
83+
MyBgworkerEntry->bgw_name,count);
84+
}
85+
9186
SPI_finish();
9287
PopActiveSnapshot();
9388
CommitTransactionCommand();
89+
if(count)SetConfigOption("search_path","schedule",PGC_USERSET,PGC_S_SESSION);
9490

95-
returnfound;
91+
returncount;
9692
}
9793

9894
intget_scheduler_maxworkers(void)
@@ -269,7 +265,7 @@ scheduler_task_t *scheduler_get_active_tasks(scheduler_manager_ctx_t *ctx, int *
269265

270266
*nt=0;
271267
initStringInfo(&sql);
272-
appendStringInfo(&sql,"select id, rule, postpone, _next_exec_time, next_time_statement fromschedule.cron where active and not broken and (start_date <= 'now' or start_date is null) and (end_date <= 'now' or end_date is null) and node = '%s'",ctx->nodename);
268+
appendStringInfo(&sql,"select id, rule, postpone, _next_exec_time, next_time_statement from cron where active and not broken and (start_date <= 'now' or start_date is null) and (end_date <= 'now' or end_date is null) and node = '%s'",ctx->nodename);
273269

274270
pgstat_report_activity(STATE_RUNNING,"select 'at' tasks");
275271

@@ -590,7 +586,7 @@ int how_many_instances_on_work(scheduler_manager_ctx_t *ctx, int cron_id)
590586
intset_job_on_free_slot(scheduler_manager_ctx_t*ctx,job_t*job)
591587
{
592588
scheduler_manager_slot_t*item;
593-
constchar*sql="updateschedule.at set started = 'now'::timestamp with time zone, active = true where cron = $1 and start_at = $2";
589+
constchar*sql="update at set started = 'now'::timestamp with time zone, active = true where cron = $1 and start_at = $2";
594590
Datumvalues[2];
595591
Oidargtypes[2]= {INT4OID,TIMESTAMPTZOID};
596592
intret;
@@ -977,7 +973,7 @@ int mark_job_broken(scheduler_manager_ctx_t *ctx, int cron_id, char *reason)
977973
Oidtypes[2]= {INT4OID,TEXTOID };
978974
Datumvalues[2];
979975
char*error;
980-
char*sql="updateschedule.cron set reason = $2, broken = true where id = $1";
976+
char*sql="update cron set reason = $2, broken = true where id = $1";
981977
intret;
982978

983979
values[0]=Int32GetDatum(cron_id);
@@ -997,7 +993,7 @@ int update_cron_texttime(scheduler_manager_ctx_t *ctx, int cron_id, TimestampTz
997993
boolnulls[2]= {' ',' ' };
998994
char*error;
999995
intret;
1000-
char*sql="updateschedule.cron set _next_exec_time = $2 where id = $1";
996+
char*sql="update cron set _next_exec_time = $2 where id = $1";
1001997

1002998
values[0]=Int32GetDatum(cron_id);
1003999
if(next>0)
@@ -1073,9 +1069,9 @@ int insert_at_record(char *nodename, int cron_id, TimestampTz start_at, Timestam
10731069
Datumvalues[4];
10741070
charnulls[4]= {' ',' ',' ',' ' };
10751071
Oidargtypes[4];
1076-
char*insert_sql="insert intoschedule.at (start_at, last_start_available, node, retry, cron, active) values ($1, $2, $3, 0, $4, false)";
1077-
char*at_sql="select count(start_at) fromschedule.at where cron = $1 and start_at = $2";
1078-
char*log_sql="select count(start_at) fromschedule.log where cron = $1 and start_at = $2";
1072+
char*insert_sql="insert into at (start_at, last_start_available, node, retry, cron, active) values ($1, $2, $3, 0, $4, false)";
1073+
char*at_sql="select count(start_at) from at where cron = $1 and start_at = $2";
1074+
char*log_sql="select count(start_at) from log where cron = $1 and start_at = $2";
10791075
intcount,ret;
10801076

10811077
argtypes[0]=INT4OID;
@@ -1213,11 +1209,11 @@ void clean_at_table(scheduler_manager_ctx_t *ctx)
12131209
char*error=NULL;
12141210

12151211
START_SPI_SNAP();
1216-
if(execute_spi("truncateschedule.at",&error)<0)
1212+
if(execute_spi("truncate at",&error)<0)
12171213
{
12181214
manager_fatal_error(ctx,0,"Cannot clean 'at' table: %s",error);
12191215
}
1220-
if(execute_spi("updateschedule.cron set _next_exec_time = NULL where _next_exec_time is not NULL",&error)<0)
1216+
if(execute_spi("update cron set _next_exec_time = NULL where _next_exec_time is not NULL",&error)<0)
12211217
{
12221218
manager_fatal_error(ctx,0,"Cannot clean cron _next time: %s",error);
12231219
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp