Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf0e7402

Browse files
author
Vladimir Ershov
committed
bug fixes
refresh manager when some guc changed
1 parent19ee000 commitf0e7402

File tree

4 files changed

+113
-16
lines changed

4 files changed

+113
-16
lines changed

‎pgpro_scheduler--1.0.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,29 @@ $BODY$
737737
LANGUAGE plpgsql
738738
SECURITY DEFINER;
739739

740+
CREATEFUNCTIONschedule.get_cron() RETURNS SETOFschedule.cron_recAS
741+
$BODY$
742+
DECLARE
743+
iischedule.cron;
744+
ooschedule.cron_rec;
745+
is_superuserboolean;
746+
BEGIN
747+
EXECUTE'SELECT usesuper FROM pg_user WHERE usename = session_user'
748+
INTO is_superuser;
749+
IF NOT is_superuser THEN
750+
RAISE EXCEPTION'access denied: only superuser allowed';
751+
END IF;
752+
753+
FOR iiINSELECT*FROMschedule.cron LOOP
754+
oo :=schedule._make_cron_rec(ii);
755+
RETURN NEXT oo;
756+
END LOOP;
757+
RETURN;
758+
END
759+
$BODY$
760+
LANGUAGE plpgsql
761+
SECURITY DEFINER;
762+
740763
CREATEFUNCTIONschedule.get_user_owned_cron() RETURNS SETOFschedule.cron_recAS
741764
$BODY$
742765
DECLARE

‎src/pgpro_scheduler.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ char_array_t *readBasesToCheck(void)
205205
if(i+1!=names->n)appendStringInfo(&sql,",");
206206
}
207207
appendStringInfo(&sql,")");
208-
elog(LOG,"SQL: %s",sql.data);
209208
SetCurrentStatementStartTimestamp();
210209
StartTransactionCommand();
211210
SPI_connect();

‎src/scheduler_manager.c

Lines changed: 88 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,24 @@ int checkSchedulerNamespace(void)
9898
intget_scheduler_maxworkers(void)
9999
{
100100
constchar*opt;
101+
intvar;
101102

102-
opt=GetConfigOption("scheduler.max_workers", true, false);
103+
opt=GetConfigOption("schedule.max_workers", true, false);
104+
/* opt = GetConfigOptionByName("schedule.max_workers", NULL); */
103105
if(opt==NULL)
104106
{
105107
return2;
106108
}
107-
returnatoi(opt);
109+
110+
var=atoi(opt);
111+
/* pfree(opt); */
112+
returnvar;
108113
}
109114

110115
char*get_scheduler_nodename(void)
111116
{
112117
constchar*opt;
113-
opt=GetConfigOption("scheduler.nodename", true, false);
118+
opt=GetConfigOption("schedule.nodename", true, false);
114119

115120
return_copy_string((char*)(opt==NULL||strlen(opt)==0 ?"master":opt));
116121
}
@@ -140,13 +145,74 @@ scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname, dsm_
140145
returnctx;
141146
}
142147

143-
voidrefresh_scheduler_manager_context(scheduler_manager_ctx_t*ctx)
148+
intrefresh_scheduler_manager_context(scheduler_manager_ctx_t*ctx)
144149
{
145-
/* TODO set new nodename , if changed kill all kids workers, change
146-
max-workers resize slots
147-
if less then was and all slots are buisy [ ??? ]
148-
kill youngest (?)
149-
*/
150+
intrc=0;
151+
intN,i,busy;
152+
scheduler_manager_slot_t**old;
153+
154+
N=get_scheduler_maxworkers();
155+
if(N!=ctx->slots_len)
156+
{
157+
elog(LOG,"Change available workers number %d => %d",ctx->slots_len,N);
158+
}
159+
160+
if(N>ctx->slots_len)
161+
{
162+
pgstat_report_activity(STATE_RUNNING,"extend the number of workers");
163+
164+
old=ctx->slots;
165+
ctx->slots=worker_alloc(sizeof(scheduler_manager_slot_t*)*N);
166+
for(i=0;i<N;i++)
167+
{
168+
ctx->slots[i]=NULL;
169+
}
170+
for(i=0;i<ctx->slots_len;i++)
171+
{
172+
ctx->slots[i]=old[i];
173+
}
174+
pfree(old);
175+
ctx->free_slots+= (N-ctx->slots_len);
176+
ctx->slots_len=N;
177+
}
178+
elseif(N<ctx->slots_len)
179+
{
180+
pgstat_report_activity(STATE_RUNNING,"shrink the number of workers");
181+
busy=ctx->slots_len-ctx->free_slots;
182+
if(N >=busy)
183+
{
184+
ctx->slots=repalloc(ctx->slots,sizeof(scheduler_manager_slot_t*)*N);
185+
ctx->slots_len=N;
186+
ctx->free_slots=N-busy;
187+
}
188+
else
189+
{
190+
pgstat_report_activity(STATE_RUNNING,"wait for some workers free slots");
191+
while(!got_sigterm)
192+
{
193+
CHECK_FOR_INTERRUPTS();
194+
scheduler_check_slots(ctx);
195+
busy=ctx->slots_len-ctx->free_slots;
196+
if(N >=busy)
197+
{
198+
ctx->slots=repalloc(ctx->slots,sizeof(scheduler_manager_slot_t*)*N);
199+
ctx->slots_len=N;
200+
ctx->free_slots=N-busy;
201+
break;
202+
}
203+
if(rc)
204+
{
205+
if(rc&WL_POSTMASTER_DEATH)proc_exit(1);
206+
if(got_sigterm||got_sighup)return0;
207+
}
208+
rc=WaitLatch(MyLatch,
209+
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,500L);
210+
ResetLatch(MyLatch);
211+
}
212+
}
213+
}
214+
215+
return1;
150216
}
151217

152218
voiddestroy_scheduler_manager_context(scheduler_manager_ctx_t*ctx)
@@ -218,7 +284,6 @@ scheduler_task_t *scheduler_get_active_tasks(scheduler_manager_ctx_t *ctx, int *
218284
tupdesc=SPI_tuptable->tupdesc;
219285

220286
tasks=worker_alloc(sizeof(scheduler_task_t)*processed);
221-
elog(LOG,"Found %d tasks",processed);
222287

223288
for(i=0;i<processed;i++)
224289
{
@@ -496,7 +561,6 @@ TimestampTz *scheduler_calc_next_task_time(scheduler_task_t *task, TimestampTz s
496561
curr+=SECS_PER_MINUTE;
497562
#endif
498563
}
499-
elog(LOG,"made: %d",*ntimes);
500564
for(i=0;i<5 ;i++)destroy_bit_array(&cron[i],0);
501565
if(*ntimes==0)
502566
{
@@ -1144,6 +1208,14 @@ void clean_at_table(scheduler_manager_ctx_t *ctx)
11441208
STOP_SPI_SNAP();
11451209
}
11461210

1211+
voidset_slots_stat_report(scheduler_manager_ctx_t*ctx)
1212+
{
1213+
charstate[128];
1214+
snprintf(state,128,"slots busy: %d, free: %d",
1215+
ctx->slots_len-ctx->free_slots,ctx->free_slots);
1216+
pgstat_report_activity(STATE_RUNNING,state);
1217+
}
1218+
11471219
voidmanager_worker_main(Datumarg)
11481220
{
11491221
char*database;
@@ -1196,7 +1268,6 @@ void manager_worker_main(Datum arg)
11961268
dsm_detach(seg);
11971269
proc_exit(0);
11981270
}
1199-
elog(LOG,"ON");
12001271
SetCurrentStatementStartTimestamp();
12011272
pgstat_report_activity(STATE_RUNNING,"initialize.");
12021273

@@ -1209,7 +1280,8 @@ void manager_worker_main(Datum arg)
12091280
init_worker_mem_ctx("WorkerMemoryContext");
12101281
ctx=initialize_scheduler_manager_context(database,seg);
12111282
clean_at_table(ctx);
1212-
elog(LOG,"Start main loop");
1283+
set_slots_stat_report(ctx);
1284+
12131285
while(!got_sigterm)
12141286
{
12151287
if(rc)
@@ -1220,20 +1292,22 @@ void manager_worker_main(Datum arg)
12201292
got_sighup= false;
12211293
ProcessConfigFile(PGC_SIGHUP);
12221294
refresh_scheduler_manager_context(ctx);
1295+
set_slots_stat_report(ctx);
12231296
}
12241297
if(!got_sighup&& !got_sigterm)
12251298
{
12261299
if(rc&WL_LATCH_SET)
12271300
{
12281301
scheduler_check_slots(ctx);
1302+
set_slots_stat_report(ctx);
12291303
}
12301304
elseif(rc&WL_TIMEOUT)
12311305
{
12321306
scheduler_make_at_record(ctx);
12331307
scheduler_vanish_expired_jobs(ctx);
12341308
scheduler_start_jobs(ctx);
12351309
scheduler_check_slots(ctx);
1236-
pgstat_report_activity(STATE_IDLE,"");
1310+
set_slots_stat_report(ctx);
12371311
}
12381312
}
12391313
}

‎src/scheduler_manager.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ void manager_worker_main(Datum arg);
6868
intget_scheduler_maxworkers(void);
6969
char*get_scheduler_nodename(void);
7070
scheduler_manager_ctx_t*initialize_scheduler_manager_context(char*dbname,dsm_segment*seg);
71-
voidrefresh_scheduler_manager_context(scheduler_manager_ctx_t*ctx);
71+
intrefresh_scheduler_manager_context(scheduler_manager_ctx_t*ctx);
7272
voiddestroy_scheduler_manager_context(scheduler_manager_ctx_t*ctx);
7373
intscheduler_manager_stop(scheduler_manager_ctx_t*ctx);
7474
scheduler_task_t*scheduler_get_active_tasks(scheduler_manager_ctx_t*ctx,int*nt);
@@ -93,5 +93,6 @@ void clean_at_table(scheduler_manager_ctx_t *ctx);
9393
intupdate_cron_texttime(scheduler_manager_ctx_t*ctx,intcron_id,TimestampTznext);
9494
intmark_job_broken(scheduler_manager_ctx_t*ctx,intcron_id,char*reason);
9595
voidmanager_fatal_error(scheduler_manager_ctx_t*ctx,intecode,char*message, ...)pg_attribute_printf(3,4);
96+
voidset_slots_stat_report(scheduler_manager_ctx_t*ctx);
9697

9798
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp