Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit115358b

Browse files
committed
Merge branch 'PGPROEE9_6_scheduler' into PGPROEE9_6
2 parents366c9ec +872a8be commit115358b

16 files changed

+1772
-479
lines changed

‎contrib/pgpro_scheduler/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ else
2020
include $(top_builddir)/src/Makefile.global
2121
include $(top_srcdir)/contrib/contrib-global.mk
2222
endif
23+
24+
#check: temp-install
25+
#$(prove_check)

‎contrib/pgpro_scheduler/pgpro_scheduler--2.0.sql

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ SET search_path TO schedule;
55

66
CREATETYPEjob_status_tAS ENUM ('working','done','error');
77
CREATETYPEjob_at_status_tAS ENUM ('submitted','processing','done');
8+
CREATESEQUENCEschedule.at_jobs_submitted_id_seq;
89

910
CREATETABLEat_jobs_submitted(
10-
idSERIALPRIMARY KEY,
11+
idintPRIMARY KEY,
1112
nodetext,
1213
nametext,
1314
commentstext,
@@ -37,6 +38,8 @@ ALTER TABLE at_jobs_done ADD status boolean;
3738
ALTERTABLE at_jobs_done ADD reasontext;
3839
ALTERTABLE at_jobs_done ADD done_timetimestamp with time zone default now();
3940

41+
ALTERTABLE at_jobs_submitted ALTER idSET default nextval('schedule.at_jobs_submitted_id_seq');
42+
4043

4144
CREATETABLEcron(
4245
idSERIALPRIMARY KEY,
@@ -281,7 +284,12 @@ DECLARE
281284
cron_idINTEGER;
282285
BEGIN
283286
cron_id :=NEW.id;
284-
IF NOTNEW.activeORNEW.brokenORNEW.rule<>OLD.ruleORNEW.postpone<>OLD.postpone THEN
287+
IF NOTNEW.activeORNEW.brokenOR
288+
coalesce(NEW.rule<>OLD.rule, true)OR
289+
coalesce(NEW.postpone<>OLD.postpone, true)OR
290+
coalesce(NEW.start_date<>OLD.start_date, true)OR
291+
coalesce(NEW.end_date<>OLD.end_date, true)
292+
THEN
285293
DELETEFROM atWHERE cron= cron_idAND active= false;
286294
END IF;
287295
RETURN OLD;
@@ -1203,9 +1211,9 @@ BEGIN
12031211
END IF;
12041212

12051213
IF usename='___all___' THEN
1206-
sql_cmd :='SELECT * FROM log as l, cron as cronWHERE cron.id = l.cron';
1214+
sql_cmd :='SELECT * FROM log as lLEFT OUTER JOIN cronON cron.id = l.cron';
12071215
ELSE
1208-
sql_cmd :='SELECT * FROM log as l, cron as cronWHERE cron.executor ='''|| usename||''' AND cron.id = l.cron';
1216+
sql_cmd :='SELECT * FROM log as lLEFT OUTER JOIN cronON cron.executor ='''|| usename||''' AND cron.id = l.cron';
12091217
END IF;
12101218

12111219
FOR iiIN EXECUTE sql_cmd LOOP
@@ -1338,7 +1346,7 @@ CREATE VIEW all_job_status AS
13381346
attempt, resubmit_limit, postponeas max_wait_interval,
13391347
max_run_timeas max_duration, submit_time,
13401348
start_time, statusas is_success, reasonas error, done_time,
1341-
'processing'::job_at_status_t status
1349+
'done'::job_at_status_t status
13421350
FROMschedule.at_jobs_done
13431351
UNION
13441352
SELECT

‎contrib/pgpro_scheduler/src/char_array.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ char_array_t *makeCharArray(void)
2525

2626
char_array_t*sortCharArray(char_array_t*a)
2727
{
28+
if(a->n <=1)returna;
2829
qsort(a->data,a->n,sizeof(char*),__sort_char_string);
2930

3031
returna;

‎contrib/pgpro_scheduler/src/memutils.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ void *worker_alloc(Size size)
2929

3030
voiddelete_worker_mem_ctx(void)
3131
{
32+
MemoryContextSwitchTo(TopMemoryContext);
3233
MemoryContextDelete(SchedulerWorkerContext);
3334
}

‎contrib/pgpro_scheduler/src/pgpro_scheduler.c

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ char_array_t *readBasesToCheck(void)
212212
pgstat_report_activity(STATE_RUNNING,"read configuration");
213213
result=makeCharArray();
214214

215-
value=GetConfigOption("schedule.database",1,0);
215+
value=GetConfigOption("schedule.database",true, false);
216216
if(!value||strlen(value)==0)
217217
{
218218
returnresult;
@@ -254,6 +254,7 @@ char_array_t *readBasesToCheck(void)
254254
pfree(clean_value);
255255
if(names->n==0)
256256
{
257+
destroyCharArray(names);
257258
returnresult;
258259
}
259260

@@ -264,45 +265,39 @@ char_array_t *readBasesToCheck(void)
264265
appendStringInfo(&sql,"'%s'",names->data[i]);
265266
if(i+1!=names->n)appendStringInfo(&sql,",");
266267
}
268+
destroyCharArray(names);
267269
appendStringInfo(&sql,")");
268-
SetCurrentStatementStartTimestamp();
269-
StartTransactionCommand();
270-
SPI_connect();
271-
PushActiveSnapshot(GetTransactionSnapshot());
270+
271+
START_SPI_SNAP();
272272

273273
ret=SPI_execute(sql.data, true,0);
274274
if (ret!=SPI_OK_SELECT)
275275
{
276-
SPI_finish();
277-
PopActiveSnapshot();
278-
CommitTransactionCommand();
276+
STOP_SPI_SNAP();
277+
elog(ERROR,"cannot select from pg_database");
279278
}
280-
destroyCharArray(names);
281279
processed=SPI_processed;
282280
if(processed==0)
283281
{
284-
SPI_finish();
285-
PopActiveSnapshot();
286-
CommitTransactionCommand();
282+
STOP_SPI_SNAP();
287283
returnresult;
288284
}
289285
for(i=0;i<processed;i++)
290286
{
291287
clean_value=SPI_getvalue(SPI_tuptable->vals[i],SPI_tuptable->tupdesc,1);
292288
pushCharArray(result,clean_value);
293289
}
294-
SPI_finish();
295-
PopActiveSnapshot();
296-
CommitTransactionCommand();
290+
STOP_SPI_SNAP();
297291
sortCharArray(result);
292+
298293
returnresult;
299294
}
300295

301296
voidparent_scheduler_main(Datumarg)
302297
{
303298
intrc=0,i;
304299
char_array_t*names=NULL;
305-
schd_managers_poll_t*poll;
300+
schd_managers_poll_t*pool;
306301
schd_manager_share_t*shared;
307302
boolrefresh= false;
308303

@@ -319,10 +314,10 @@ void parent_scheduler_main(Datum arg)
319314

320315
BackgroundWorkerInitializeConnection("postgres",NULL);
321316
names=readBasesToCheck();
322-
poll=initSchedulerManagerPool(names);
317+
pool=initSchedulerManagerPool(names);
323318
destroyCharArray(names);
324319

325-
set_supervisor_pgstatus(poll);
320+
set_supervisor_pgstatus(pool);
326321

327322
while(!got_sigterm)
328323
{
@@ -334,62 +329,62 @@ void parent_scheduler_main(Datum arg)
334329
ProcessConfigFile(PGC_SIGHUP);
335330
refresh= false;
336331
names=NULL;
337-
if(is_scheduler_enabled()!=poll->enabled)
332+
if(is_scheduler_enabled()!=pool->enabled)
338333
{
339-
if(poll->enabled)
334+
if(pool->enabled)
340335
{
341-
poll->enabled= false;
342-
stopAllManagers(poll);
343-
set_supervisor_pgstatus(poll);
336+
pool->enabled= false;
337+
stopAllManagers(pool);
338+
set_supervisor_pgstatus(pool);
344339
}
345340
else
346341
{
347342
refresh= true;
348-
poll->enabled= true;
343+
pool->enabled= true;
349344
names=readBasesToCheck();
350345
}
351346
}
352-
elseif(poll->enabled)
347+
elseif(pool->enabled)
353348
{
354349
names=readBasesToCheck();
355-
if(isBaseListChanged(names,poll))refresh= true;
350+
if(isBaseListChanged(names,pool))refresh= true;
356351
elsedestroyCharArray(names);
357352
}
358353

359354
if(refresh)
360355
{
361-
refreshManagers(names,poll);
362-
set_supervisor_pgstatus(poll);
356+
refreshManagers(names,pool);
357+
set_supervisor_pgstatus(pool);
363358
destroyCharArray(names);
364359
}
365360
}
366361
else
367362
{
368-
for(i=0;i<poll->n;i++)
363+
for(i=0;i<pool->n;i++)
369364
{
370-
shared=dsm_segment_address(poll->workers[i]->shared);
365+
shared=dsm_segment_address(pool->workers[i]->shared);
371366

372367
if(shared->setbychild)
373368
{
374-
/* elog(LOG, "got status change from: %s",poll->workers[i]->dbname); */
369+
/* elog(LOG, "got status change from: %s",pool->workers[i]->dbname); */
375370
shared->setbychild= false;
376371
if(shared->status==SchdManagerConnected)
377372
{
378-
poll->workers[i]->connected= true;
373+
pool->workers[i]->connected= true;
379374
}
380375
elseif(shared->status==SchdManagerQuit)
381376
{
382-
removeManagerFromPoll(poll,poll->workers[i]->dbname,1, true);
383-
set_supervisor_pgstatus(poll);
377+
removeManagerFromPoll(pool,pool->workers[i]->dbname,1, true);
378+
set_supervisor_pgstatus(pool);
384379
}
385380
elseif(shared->status==SchdManagerDie)
386381
{
387-
removeManagerFromPoll(poll,poll->workers[i]->dbname,1, false);
388-
set_supervisor_pgstatus(poll);
382+
removeManagerFromPoll(pool,pool->workers[i]->dbname,1, false);
383+
set_supervisor_pgstatus(pool);
389384
}
390385
else
391386
{
392-
elog(WARNING,"manager: %s set strange status: %d",poll->workers[i]->dbname,shared->status);
387+
elog(WARNING,"manager: %s set strange status: %d",pool->workers[i]->dbname,shared->status);
393388
}
394389
}
395390
}
@@ -399,7 +394,7 @@ void parent_scheduler_main(Datum arg)
399394
CHECK_FOR_INTERRUPTS();
400395
ResetLatch(MyLatch);
401396
}
402-
stopAllManagers(poll);
397+
stopAllManagers(pool);
403398
delete_worker_mem_ctx();
404399

405400
proc_exit(0);
@@ -413,7 +408,7 @@ pg_scheduler_startup(void)
413408
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |
414409
BGWORKER_BACKEND_DATABASE_CONNECTION;
415410
worker.bgw_start_time=BgWorkerStart_ConsistentState;
416-
worker.bgw_restart_time=BGW_NEVER_RESTART;
411+
worker.bgw_restart_time=10;
417412
worker.bgw_main=NULL;
418413
worker.bgw_notify_pid=0;
419414
worker.bgw_main_arg=Int32GetDatum(0);

‎contrib/pgpro_scheduler/src/sched_manager_poll.c

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ char *supervisor_state(schd_managers_poll_t *poll)
4343

4444
if(!poll->enabled)
4545
{
46-
status=palloc(sizeof(char)*9);
46+
status=worker_alloc(sizeof(char)*9);
4747
memcpy(status,"disabled",8);
4848
status[8]=0;
4949
returnstatus;
@@ -52,13 +52,13 @@ char *supervisor_state(schd_managers_poll_t *poll)
5252
len=dbnames ?strlen(dbnames):0;
5353
if(len==0)
5454
{
55-
status=palloc(sizeof(char)*26);
55+
status=worker_alloc(sizeof(char)*26);
5656
memcpy(status,"waiting databases to set",25);
5757
status[25]=0;
5858
}
5959
else
6060
{
61-
status=palloc(sizeof(char)* (len+10));
61+
status=worker_alloc(sizeof(char)* (len+10));
6262
memcpy(status,"work on: ",9);
6363
memcpy(status+9,dbnames,len);
6464
status[len+9]=0;
@@ -82,7 +82,7 @@ char *poll_dbnames(schd_managers_poll_t *poll)
8282
if(i< (poll->n-1))
8383
appendStringInfo(&string,", ");
8484
}
85-
out=palloc(sizeof(char)* (string.len+1));
85+
out=worker_alloc(sizeof(char)* (string.len+1));
8686
memcpy(out,string.data,string.len);
8787
out[string.len]=0;
8888
pfree(string.data);
@@ -297,29 +297,18 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
297297
schd_manager_t*man;
298298
schd_manager_share_t*shm_data;
299299
Sizesegsize;
300-
/*shm_toc_estimator e;
301-
shm_toc *toc; */
302300
dsm_segment*seg;
303301

304-
/*shm_toc_initialize_estimator(&e);
305-
shm_toc_estimate_chunk(&e, sizeof(schd_manager_share_t));
306-
shm_toc_estimate_keys(&e, 1);
307-
segsize = shm_toc_estimate(&e); */
308302
segsize= (Size)sizeof(schd_manager_share_t);
309303

310304
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
311305
seg=dsm_create(segsize,0);
312306

313-
man=palloc(sizeof(schd_manager_t));
314-
man->dbname=palloc(sizeof(char*)* (strlen(name)+1));
307+
man=worker_alloc(sizeof(schd_manager_t));
308+
man->dbname=worker_alloc(sizeof(char*)* (strlen(name)+1));
315309
man->connected= false;
316310
memcpy(man->dbname,name,strlen(name)+1);
317311
man->shared=seg;
318-
/*toc = shm_toc_create(PGPRO_SHM_TOC_MAGIC, dsm_segment_address(man->shared),
319-
segsize);
320-
321-
shm_data = shm_toc_allocate(toc, sizeof(schd_manager_share_t));
322-
shm_toc_insert(toc, 0, shm_data); */
323312
shm_data=dsm_segment_address(man->shared);
324313

325314
shm_data->setbyparent= true;
@@ -331,7 +320,7 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
331320
pos=poll->n++;
332321
poll->workers=poll->workers ?
333322
repalloc(poll->workers,sizeof(schd_manager_t*)*poll->n):
334-
palloc(sizeof(schd_manager_t*));
323+
worker_alloc(sizeof(schd_manager_t*));
335324
poll->workers[pos]=man;
336325
if(sort)_sortPollManagers(poll);
337326

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp