Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit963efe8

Browse files
author
Vladimir Ershov
committed
Merge commit '4ab60603d6e350bd036a7d172d5a3c01fa43157a' into PGPROEE9_6_scheduler
2 parentsa69f3b3 +4ab6060 commit963efe8

File tree

10 files changed

+298
-91
lines changed

10 files changed

+298
-91
lines changed

‎contrib/pgpro_scheduler/pgpro_scheduler--2.0.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ SET search_path TO schedule;
55

66
CREATETYPEjob_status_tAS ENUM ('working','done','error');
77
CREATETYPEjob_at_status_tAS ENUM ('submitted','processing','done');
8+
CREATESEQUENCEschedule.at_jobs_submitted_id_seq;
89

910
CREATETABLEat_jobs_submitted(
10-
idSERIALPRIMARY KEY,
11+
idintPRIMARY KEY,
1112
nodetext,
1213
nametext,
1314
commentstext,
@@ -37,6 +38,8 @@ ALTER TABLE at_jobs_done ADD status boolean;
3738
ALTERTABLE at_jobs_done ADD reasontext;
3839
ALTERTABLE at_jobs_done ADD done_timetimestamp with time zone default now();
3940

41+
ALTERTABLE at_jobs_submitted ALTER idSET default nextval('schedule.at_jobs_submitted_id_seq');
42+
4043

4144
CREATETABLEcron(
4245
idSERIALPRIMARY KEY,

‎contrib/pgpro_scheduler/src/char_array.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ char_array_t *makeCharArray(void)
2525

2626
char_array_t*sortCharArray(char_array_t*a)
2727
{
28+
if(a->n <=1)returna;
2829
qsort(a->data,a->n,sizeof(char*),__sort_char_string);
2930

3031
returna;

‎contrib/pgpro_scheduler/src/memutils.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ void *worker_alloc(Size size)
2929

3030
voiddelete_worker_mem_ctx(void)
3131
{
32+
MemoryContextSwitchTo(TopMemoryContext);
3233
MemoryContextDelete(SchedulerWorkerContext);
3334
}

‎contrib/pgpro_scheduler/src/pgpro_scheduler.c

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ char_array_t *readBasesToCheck(void)
212212
pgstat_report_activity(STATE_RUNNING,"read configuration");
213213
result=makeCharArray();
214214

215-
value=GetConfigOption("schedule.database",1,0);
215+
value=GetConfigOption("schedule.database",true, false);
216216
if(!value||strlen(value)==0)
217217
{
218218
returnresult;
@@ -254,6 +254,7 @@ char_array_t *readBasesToCheck(void)
254254
pfree(clean_value);
255255
if(names->n==0)
256256
{
257+
destroyCharArray(names);
257258
returnresult;
258259
}
259260

@@ -264,45 +265,39 @@ char_array_t *readBasesToCheck(void)
264265
appendStringInfo(&sql,"'%s'",names->data[i]);
265266
if(i+1!=names->n)appendStringInfo(&sql,",");
266267
}
268+
destroyCharArray(names);
267269
appendStringInfo(&sql,")");
268-
SetCurrentStatementStartTimestamp();
269-
StartTransactionCommand();
270-
SPI_connect();
271-
PushActiveSnapshot(GetTransactionSnapshot());
270+
271+
START_SPI_SNAP();
272272

273273
ret=SPI_execute(sql.data, true,0);
274274
if (ret!=SPI_OK_SELECT)
275275
{
276-
SPI_finish();
277-
PopActiveSnapshot();
278-
CommitTransactionCommand();
276+
STOP_SPI_SNAP();
277+
elog(ERROR,"cannot select from pg_database");
279278
}
280-
destroyCharArray(names);
281279
processed=SPI_processed;
282280
if(processed==0)
283281
{
284-
SPI_finish();
285-
PopActiveSnapshot();
286-
CommitTransactionCommand();
282+
STOP_SPI_SNAP();
287283
returnresult;
288284
}
289285
for(i=0;i<processed;i++)
290286
{
291287
clean_value=SPI_getvalue(SPI_tuptable->vals[i],SPI_tuptable->tupdesc,1);
292288
pushCharArray(result,clean_value);
293289
}
294-
SPI_finish();
295-
PopActiveSnapshot();
296-
CommitTransactionCommand();
290+
STOP_SPI_SNAP();
297291
sortCharArray(result);
292+
298293
returnresult;
299294
}
300295

301296
voidparent_scheduler_main(Datumarg)
302297
{
303298
intrc=0,i;
304299
char_array_t*names=NULL;
305-
schd_managers_poll_t*poll;
300+
schd_managers_poll_t*pool;
306301
schd_manager_share_t*shared;
307302
boolrefresh= false;
308303

@@ -319,10 +314,10 @@ void parent_scheduler_main(Datum arg)
319314

320315
BackgroundWorkerInitializeConnection("postgres",NULL);
321316
names=readBasesToCheck();
322-
poll=initSchedulerManagerPool(names);
317+
pool=initSchedulerManagerPool(names);
323318
destroyCharArray(names);
324319

325-
set_supervisor_pgstatus(poll);
320+
set_supervisor_pgstatus(pool);
326321

327322
while(!got_sigterm)
328323
{
@@ -334,62 +329,62 @@ void parent_scheduler_main(Datum arg)
334329
ProcessConfigFile(PGC_SIGHUP);
335330
refresh= false;
336331
names=NULL;
337-
if(is_scheduler_enabled()!=poll->enabled)
332+
if(is_scheduler_enabled()!=pool->enabled)
338333
{
339-
if(poll->enabled)
334+
if(pool->enabled)
340335
{
341-
poll->enabled= false;
342-
stopAllManagers(poll);
343-
set_supervisor_pgstatus(poll);
336+
pool->enabled= false;
337+
stopAllManagers(pool);
338+
set_supervisor_pgstatus(pool);
344339
}
345340
else
346341
{
347342
refresh= true;
348-
poll->enabled= true;
343+
pool->enabled= true;
349344
names=readBasesToCheck();
350345
}
351346
}
352-
elseif(poll->enabled)
347+
elseif(pool->enabled)
353348
{
354349
names=readBasesToCheck();
355-
if(isBaseListChanged(names,poll))refresh= true;
350+
if(isBaseListChanged(names,pool))refresh= true;
356351
elsedestroyCharArray(names);
357352
}
358353

359354
if(refresh)
360355
{
361-
refreshManagers(names,poll);
362-
set_supervisor_pgstatus(poll);
356+
refreshManagers(names,pool);
357+
set_supervisor_pgstatus(pool);
363358
destroyCharArray(names);
364359
}
365360
}
366361
else
367362
{
368-
for(i=0;i<poll->n;i++)
363+
for(i=0;i<pool->n;i++)
369364
{
370-
shared=dsm_segment_address(poll->workers[i]->shared);
365+
shared=dsm_segment_address(pool->workers[i]->shared);
371366

372367
if(shared->setbychild)
373368
{
374-
/* elog(LOG, "got status change from: %s",poll->workers[i]->dbname); */
369+
/* elog(LOG, "got status change from: %s",pool->workers[i]->dbname); */
375370
shared->setbychild= false;
376371
if(shared->status==SchdManagerConnected)
377372
{
378-
poll->workers[i]->connected= true;
373+
pool->workers[i]->connected= true;
379374
}
380375
elseif(shared->status==SchdManagerQuit)
381376
{
382-
removeManagerFromPoll(poll,poll->workers[i]->dbname,1, true);
383-
set_supervisor_pgstatus(poll);
377+
removeManagerFromPoll(pool,pool->workers[i]->dbname,1, true);
378+
set_supervisor_pgstatus(pool);
384379
}
385380
elseif(shared->status==SchdManagerDie)
386381
{
387-
removeManagerFromPoll(poll,poll->workers[i]->dbname,1, false);
388-
set_supervisor_pgstatus(poll);
382+
removeManagerFromPoll(pool,pool->workers[i]->dbname,1, false);
383+
set_supervisor_pgstatus(pool);
389384
}
390385
else
391386
{
392-
elog(WARNING,"manager: %s set strange status: %d",poll->workers[i]->dbname,shared->status);
387+
elog(WARNING,"manager: %s set strange status: %d",pool->workers[i]->dbname,shared->status);
393388
}
394389
}
395390
}
@@ -399,7 +394,7 @@ void parent_scheduler_main(Datum arg)
399394
CHECK_FOR_INTERRUPTS();
400395
ResetLatch(MyLatch);
401396
}
402-
stopAllManagers(poll);
397+
stopAllManagers(pool);
403398
delete_worker_mem_ctx();
404399

405400
proc_exit(0);

‎contrib/pgpro_scheduler/src/sched_manager_poll.c

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ char *supervisor_state(schd_managers_poll_t *poll)
4343

4444
if(!poll->enabled)
4545
{
46-
status=palloc(sizeof(char)*9);
46+
status=worker_alloc(sizeof(char)*9);
4747
memcpy(status,"disabled",8);
4848
status[8]=0;
4949
returnstatus;
@@ -52,13 +52,13 @@ char *supervisor_state(schd_managers_poll_t *poll)
5252
len=dbnames ?strlen(dbnames):0;
5353
if(len==0)
5454
{
55-
status=palloc(sizeof(char)*26);
55+
status=worker_alloc(sizeof(char)*26);
5656
memcpy(status,"waiting databases to set",25);
5757
status[25]=0;
5858
}
5959
else
6060
{
61-
status=palloc(sizeof(char)* (len+10));
61+
status=worker_alloc(sizeof(char)* (len+10));
6262
memcpy(status,"work on: ",9);
6363
memcpy(status+9,dbnames,len);
6464
status[len+9]=0;
@@ -82,7 +82,7 @@ char *poll_dbnames(schd_managers_poll_t *poll)
8282
if(i< (poll->n-1))
8383
appendStringInfo(&string,", ");
8484
}
85-
out=palloc(sizeof(char)* (string.len+1));
85+
out=worker_alloc(sizeof(char)* (string.len+1));
8686
memcpy(out,string.data,string.len);
8787
out[string.len]=0;
8888
pfree(string.data);
@@ -297,29 +297,18 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
297297
schd_manager_t*man;
298298
schd_manager_share_t*shm_data;
299299
Sizesegsize;
300-
/*shm_toc_estimator e;
301-
shm_toc *toc; */
302300
dsm_segment*seg;
303301

304-
/*shm_toc_initialize_estimator(&e);
305-
shm_toc_estimate_chunk(&e, sizeof(schd_manager_share_t));
306-
shm_toc_estimate_keys(&e, 1);
307-
segsize = shm_toc_estimate(&e); */
308302
segsize= (Size)sizeof(schd_manager_share_t);
309303

310304
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
311305
seg=dsm_create(segsize,0);
312306

313-
man=palloc(sizeof(schd_manager_t));
314-
man->dbname=palloc(sizeof(char*)* (strlen(name)+1));
307+
man=worker_alloc(sizeof(schd_manager_t));
308+
man->dbname=worker_alloc(sizeof(char*)* (strlen(name)+1));
315309
man->connected= false;
316310
memcpy(man->dbname,name,strlen(name)+1);
317311
man->shared=seg;
318-
/*toc = shm_toc_create(PGPRO_SHM_TOC_MAGIC, dsm_segment_address(man->shared),
319-
segsize);
320-
321-
shm_data = shm_toc_allocate(toc, sizeof(schd_manager_share_t));
322-
shm_toc_insert(toc, 0, shm_data); */
323312
shm_data=dsm_segment_address(man->shared);
324313

325314
shm_data->setbyparent= true;
@@ -331,7 +320,7 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
331320
pos=poll->n++;
332321
poll->workers=poll->workers ?
333322
repalloc(poll->workers,sizeof(schd_manager_t*)*poll->n):
334-
palloc(sizeof(schd_manager_t*));
323+
worker_alloc(sizeof(schd_manager_t*));
335324
poll->workers[pos]=man;
336325
if(sort)_sortPollManagers(poll);
337326

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp