Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit19ee000

Browse files
author
Vladimir Ershov
committed
proper exit procedure
set job brokenfatal manager error pocessing
1 parent73569c8 commit19ee000

File tree

7 files changed

+136
-36
lines changed

7 files changed

+136
-36
lines changed

‎src/pgpro_scheduler.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,6 @@ void parent_scheduler_main(Datum arg)
287287
{
288288
for(i=0;i<poll->n;i++)
289289
{
290-
/* toc = shm_toc_attach(PGPRO_SHM_TOC_MAGIC, dsm_segment_address(poll->workers[i]->shared));
291-
shared = shm_toc_lookup(toc, 0); */
292290
shared=dsm_segment_address(poll->workers[i]->shared);
293291

294292
if(shared->setbychild)
@@ -301,7 +299,12 @@ void parent_scheduler_main(Datum arg)
301299
}
302300
elseif(shared->status==SchdManagerQuit)
303301
{
304-
removeManagerFromPoll(poll,poll->workers[i]->dbname,1);
302+
removeManagerFromPoll(poll,poll->workers[i]->dbname,1, true);
303+
set_supervisor_pgstatus(poll);
304+
}
305+
elseif(shared->status==SchdManagerDie)
306+
{
307+
removeManagerFromPoll(poll,poll->workers[i]->dbname,1, false);
305308
set_supervisor_pgstatus(poll);
306309
}
307310
else
@@ -320,7 +323,7 @@ void parent_scheduler_main(Datum arg)
320323
stopAllManagers(poll);
321324
delete_worker_mem_ctx();
322325

323-
proc_exit(1);
326+
proc_exit(0);
324327
}
325328

326329
void

‎src/sched_manager_poll.c

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,15 @@ void changeChildBgwState(schd_manager_share_t *s, schd_manager_status_t status)
8888
s->setbychild= true;
8989

9090
parent=BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
91-
if(parent)SetLatch(&parent->procLatch);
92-
elog(LOG,"set LATCH %d - %d" ,MyBgworkerEntry->bgw_notify_pid,status);
91+
if(parent)
92+
{
93+
SetLatch(&parent->procLatch);
94+
elog(LOG,"set LATCH to %d - status = %d" ,MyBgworkerEntry->bgw_notify_pid,status);
95+
}
96+
else
97+
{
98+
elog(LOG,"unable to set LATCH to %d",MyBgworkerEntry->bgw_notify_pid);
99+
}
93100
}
94101

95102
intstopAllManagers(schd_managers_poll_t*poll)
@@ -98,6 +105,8 @@ int stopAllManagers(schd_managers_poll_t *poll)
98105
PGPROC*child;
99106
schd_manager_share_t*shared;
100107

108+
elog(LOG,"Stop all managers");
109+
101110
for(i=0;i<poll->n;i++)
102111
{
103112
shared=dsm_segment_address(poll->workers[i]->shared);
@@ -108,7 +117,10 @@ int stopAllManagers(schd_managers_poll_t *poll)
108117
{
109118
elog(LOG,"cannot get PGRPOC of %s scheduler",poll->workers[i]->dbname);
110119
}
111-
SetLatch(&child->procLatch);
120+
else
121+
{
122+
SetLatch(&child->procLatch);
123+
}
112124
}
113125

114126
/* MAYBE: WAIT? */
@@ -212,7 +224,7 @@ void _sortPollManagers(schd_managers_poll_t *poll)
212224
qsort(poll->workers,poll->n,sizeof(schd_manager_t*),__cmp_managers);
213225
}
214226

215-
intremoveManagerFromPoll(schd_managers_poll_t*poll,char*name,charsort)
227+
intremoveManagerFromPoll(schd_managers_poll_t*poll,char*name,charsort,boolstop_worker)
216228
{
217229
intfound=0;
218230
inti;
@@ -230,8 +242,11 @@ int removeManagerFromPoll(schd_managers_poll_t *poll, char *name, char sort)
230242
if(found==0)return0;
231243
mng=poll->workers[i];
232244

233-
elog(LOG,"Stop scheduler manager for %s",mng->dbname);
234-
TerminateBackgroundWorker(mng->handler);
245+
if(stop_worker)
246+
{
247+
elog(LOG,"Stop scheduler manager for %s",mng->dbname);
248+
TerminateBackgroundWorker(mng->handler);
249+
}
235250

236251
if(poll->n==1)
237252
{
@@ -357,7 +372,7 @@ int refreshManagers(char_array_t *names, schd_managers_poll_t *poll)
357372
{
358373
for(i=0;i<delete->n;i++)
359374
{
360-
removeManagerFromPoll(poll,delete->data[i],0);
375+
removeManagerFromPoll(poll,delete->data[i],0, true);
361376
}
362377
}
363378
if(new->n)

‎src/sched_manager_poll.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ void destroyManagerRecord(schd_manager_t *man);
4141
intstopAllManagers(schd_managers_poll_t*poll);
4242
intisBaseListChanged(char_array_t*names,schd_managers_poll_t*pool);
4343
void_sortPollManagers(schd_managers_poll_t*poll);
44-
intremoveManagerFromPoll(schd_managers_poll_t*poll,char*name,charsort);
44+
intremoveManagerFromPoll(schd_managers_poll_t*poll,char*name,charsort,boolstop_worker);
4545
intaddManagerToPoll(schd_managers_poll_t*poll,char*name,intsort);
4646
intrefreshManagers(char_array_t*names,schd_managers_poll_t*poll);
4747
char*poll_dbnames(schd_managers_poll_t*poll);

‎src/scheduler_executor.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ void executor_worker_main(Datum arg)
185185
}
186186
*/
187187
shared->next_time=get_next_excution_time(job->next_time_statement,&EE);
188+
if(shared->next_time==0)
189+
{
190+
shared->set_invalid= true;
191+
sprintf(shared->set_invalid_reason,"unable to execute next time statement");
192+
}
188193
}
189194
pgstat_report_activity(STATE_RUNNING,"finish job processing");
190195

‎src/scheduler_executor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ typedef struct {
2828
charmessage[PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX];
2929

3030
TimestampTznext_time;
31+
32+
boolset_invalid;
33+
charset_invalid_reason[PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX];
3134
}schd_executor_share_t;
3235

3336
typedefstruct {
@@ -41,7 +44,7 @@ void set_shared_message(schd_executor_share_t *shared, executor_error_t *ee);
4144
TimestampTzget_next_excution_time(char*sql,executor_error_t*ee);
4245
intexecutor_onrollback(job_t*job,executor_error_t*ee);
4346
voidset_pg_var(boolresulti,executor_error_t*ee);
44-
intpush_executor_error(executor_error_t*e,char*fmt, ...)__attribute__ ((format (gnu_printf,2,3)));
47+
intpush_executor_error(executor_error_t*e,char*fmt, ...)pg_attribute_printf(2,3);
4548
intset_session_authorization(char*username,char**error);
4649

4750

‎src/scheduler_manager.c

Lines changed: 89 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ char *get_scheduler_nodename(void)
115115
return_copy_string((char*)(opt==NULL||strlen(opt)==0 ?"master":opt));
116116
}
117117

118-
scheduler_manager_ctx_t*initialize_scheduler_manager_context(char*dbname)
118+
scheduler_manager_ctx_t*initialize_scheduler_manager_context(char*dbname,dsm_segment*seg)
119119
{
120120
inti;
121121
scheduler_manager_ctx_t*ctx;
@@ -126,6 +126,8 @@ scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname)
126126
ctx->nodename=get_scheduler_nodename();
127127
ctx->database=_copy_string(dbname);
128128

129+
ctx->seg=seg;
130+
129131
ctx->slots=worker_alloc(sizeof(scheduler_manager_slot_t*)*ctx->slots_len);
130132
for(i=0;i<ctx->slots_len;i++)
131133
{
@@ -169,10 +171,22 @@ void destroy_scheduler_manager_context(scheduler_manager_ctx_t *ctx)
169171
pfree(ctx);
170172
}
171173

172-
voidscheduler_manager_stop(scheduler_manager_ctx_t*ctx)
174+
intscheduler_manager_stop(scheduler_manager_ctx_t*ctx)
173175
{
176+
inti;
177+
intonwork;
178+
179+
onwork=ctx->slots_len-ctx->free_slots;
180+
if(onwork==0)return0;
181+
174182
pgstat_report_activity(STATE_RUNNING,"stop executors");
175-
/* TODO stop worker but before stop all started kid workers */
183+
for(i=0;i<onwork;i++)
184+
{
185+
elog(LOG,"Schedule manager: terminate bgworker %d",
186+
ctx->slots[i]->pid);
187+
TerminateBackgroundWorker(ctx->slots[i]->handler);
188+
}
189+
returnonwork;
176190
}
177191

178192
scheduler_task_t*scheduler_get_active_tasks(scheduler_manager_ctx_t*ctx,int*nt)
@@ -588,6 +602,8 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
588602
shm_data->start_at=item->job->start_at;
589603
shm_data->message[0]=0;
590604
shm_data->next_time=0;
605+
shm_data->set_invalid= false;
606+
shm_data->set_invalid_reason[0]=0;
591607

592608
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |
593609
BGWORKER_BACKEND_DATABASE_CONNECTION;
@@ -808,12 +824,17 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
808824
}
809825
elseif(toremove[i].reason==RmDone)
810826
{
827+
shm_data=dsm_segment_address(item->shared);
811828
job_status= true;
829+
if(shm_data->message[0]!=0)
830+
{
831+
set_job_error(item->job,"%s",shm_data->message);
832+
}
812833
}
813834
elseif(toremove[i].reason==RmError)
814835
{
815836
shm_data=dsm_segment_address(item->shared);
816-
if(strlen(shm_data->message)>0)
837+
if(shm_data->message[0]!=0)
817838
{
818839
set_job_error(item->job,"%s",shm_data->message);
819840
}
@@ -830,19 +851,23 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
830851
if(removeJob)
831852
{
832853
START_SPI_SNAP();
854+
shm_data=dsm_segment_address(item->shared);
855+
856+
if(shm_data->set_invalid)
857+
{
858+
mark_job_broken(ctx,item->job->cron_id,shm_data->set_invalid_reason);
859+
}
833860
if(item->job->next_time_statement)
834861
{
835-
shm_data=dsm_segment_address(item->shared);
836862
if(shm_data->next_time>0)
837863
{
838864
next_time=_round_timestamp_to_minute(shm_data->next_time);
839865
next_time_str=make_date_from_timestamp(next_time);
840866
if(insert_at_record(ctx->nodename,item->job->cron_id,next_time,0,&error)<0)
841867
{
842-
elog(ERROR,"Cannot insert next time at record: %s",
843-
error ?error:"unknown error");
868+
manager_fatal_error(ctx,0,"Cannot insert next time at record: %s",error ?error:"unknown error");
844869
}
845-
update_cron_texttime(item->job->cron_id,next_time);
870+
update_cron_texttime(ctx,item->job->cron_id,next_time);
846871
if(!item->job->error)
847872
{
848873
set_job_error(item->job,"set next exec time: %s",next_time_str);
@@ -868,7 +893,25 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
868893
return1;
869894
}
870895

871-
intupdate_cron_texttime(intcron_id,TimestampTznext)
896+
intmark_job_broken(scheduler_manager_ctx_t*ctx,intcron_id,char*reason)
897+
{
898+
Oidtypes[2]= {INT4OID,TEXTOID };
899+
Datumvalues[2];
900+
char*error;
901+
char*sql="update schedule.cron set reason = $2, broken = true where id = $1";
902+
intret;
903+
904+
values[0]=Int32GetDatum(cron_id);
905+
values[1]=CStringGetTextDatum(reason);
906+
ret=execute_spi_sql_with_args(sql,2,types,values,NULL,&error);
907+
if(ret<0)
908+
{
909+
manager_fatal_error(ctx,0,"Cannot set cron %d broken: %s",cron_id,error);
910+
}
911+
returnret;
912+
}
913+
914+
intupdate_cron_texttime(scheduler_manager_ctx_t*ctx,intcron_id,TimestampTznext)
872915
{
873916
Oidtypes[2]= {INT4OID,TIMESTAMPTZOID };
874917
Datumvalues[2];
@@ -889,7 +932,7 @@ int update_cron_texttime(int cron_id, TimestampTz next)
889932
ret=execute_spi_sql_with_args(sql,2,types,values,nulls,&error);
890933
if(ret<0)
891934
{
892-
elog(ERROR,"Cannot update cron %d next time: %s",cron_id,error);
935+
manager_fatal_error(ctx,0,"Cannot update cron %d next time: %s",cron_id,error);
893936
}
894937

895938
returnret;
@@ -1025,6 +1068,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10251068
{
10261069
n_exec_dates=0;
10271070
ntimes=0;
1071+
realloced= false;
10281072

10291073
next_times=scheduler_calc_next_task_time(&(tasks[i]),
10301074
GetCurrentTimestamp(),timestamp_add_seconds(0,600),
@@ -1035,12 +1079,13 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10351079
{
10361080
date1=make_date_from_timestamp(start);
10371081
date2=make_date_from_timestamp(stop);
1082+
10381083

10391084
for(j=0;j<n_exec_dates;j++)
10401085
{
10411086
r1=strcmp(date1,exec_dates[j]);
10421087
r2=strcmp(exec_dates[j],date2);
1043-
if((r1==0||r1==-1)&&(r2==0||r2==-1))
1088+
if(r1<=0&&r2<=0)
10441089
{
10451090
if(!realloced)
10461091
{
@@ -1069,7 +1114,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10691114
{
10701115
if(insert_at_record(ctx->nodename,tasks[i].id,next_times[j],tasks[i].postpone,&error)<0)
10711116
{
1072-
elog(ERROR,"Cannot insert AT task: %s",error ?error:"unknown error");
1117+
manager_fatal_error(ctx,0,"Cannot insert AT task: %s",error ?error:"unknown error");
10731118
}
10741119
}
10751120
pfree(next_times);
@@ -1083,18 +1128,18 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10831128
returnntasks;
10841129
}
10851130

1086-
voidclean_at_table(void)
1131+
voidclean_at_table(scheduler_manager_ctx_t*ctx)
10871132
{
10881133
char*error=NULL;
10891134

10901135
START_SPI_SNAP();
10911136
if(execute_spi("truncate schedule.at",&error)<0)
10921137
{
1093-
elog(ERROR,"Cannot clean 'at' table: %s",error);
1138+
manager_fatal_error(ctx,0,"Cannot clean 'at' table: %s",error);
10941139
}
10951140
if(execute_spi("update schedule.cron set _next_exec_time = NULL where _next_exec_time is not NULL",&error)<0)
10961141
{
1097-
elog(ERROR,"Cannot clean cron _next time: %s",error);
1142+
manager_fatal_error(ctx,0,"Cannot clean cron _next time: %s",error);
10981143
}
10991144
STOP_SPI_SNAP();
11001145
}
@@ -1120,6 +1165,7 @@ void manager_worker_main(Datum arg)
11201165

11211166
if(shared->status!=SchdManagerInit&& !(shared->setbyparent))
11221167
{
1168+
dsm_detach(seg);
11231169
ereport(ERROR,
11241170
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
11251171
errmsg("corrupted dynamic shared memory segment")));
@@ -1150,6 +1196,7 @@ void manager_worker_main(Datum arg)
11501196
dsm_detach(seg);
11511197
proc_exit(0);
11521198
}
1199+
elog(LOG,"ON");
11531200
SetCurrentStatementStartTimestamp();
11541201
pgstat_report_activity(STATE_RUNNING,"initialize.");
11551202

@@ -1160,9 +1207,9 @@ void manager_worker_main(Datum arg)
11601207
pgstat_report_activity(STATE_RUNNING,"initialize context");
11611208
changeChildBgwState(shared,SchdManagerConnected);
11621209
init_worker_mem_ctx("WorkerMemoryContext");
1163-
ctx=initialize_scheduler_manager_context(database);
1164-
clean_at_table();
1165-
1210+
ctx=initialize_scheduler_manager_context(database,seg);
1211+
clean_at_table(ctx);
1212+
elog(LOG,"Start main loop");
11661213
while(!got_sigterm)
11671214
{
11681215
if(rc)
@@ -1194,12 +1241,35 @@ void manager_worker_main(Datum arg)
11941241
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,1000L);
11951242
ResetLatch(MyLatch);
11961243
}
1244+
scheduler_manager_stop(ctx);
11971245
delete_worker_mem_ctx();
1198-
/* destroy_scheduler_manager_context(ctx); - no need any more */
11991246
changeChildBgwState(shared,SchdManagerDie);
12001247
pfree(database);
12011248
dsm_detach(seg);
12021249
proc_exit(0);
12031250
}
12041251

1252+
voidmanager_fatal_error(scheduler_manager_ctx_t*ctx,intecode,char*message, ...)
1253+
{
1254+
va_listarglist;
1255+
charbuf[1024];
1256+
1257+
scheduler_manager_stop(ctx);
1258+
changeChildBgwState((schd_manager_share_t*)(dsm_segment_address(ctx->seg)),SchdManagerDie);
1259+
dsm_detach(ctx->seg);
1260+
1261+
va_start(arglist,message);
1262+
vsnprintf(buf,1024,message,arglist);
1263+
va_end(arglist);
1264+
1265+
1266+
delete_worker_mem_ctx();
1267+
if(ecode==0)
1268+
{
1269+
ecode=ERRCODE_INTERNAL_ERROR;
1270+
}
1271+
1272+
ereport(ERROR, (errcode(ecode),errmsg("%s",buf)));
1273+
}
1274+
12051275

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp