@@ -98,19 +98,24 @@ int checkSchedulerNamespace(void)
9898int get_scheduler_maxworkers (void )
9999{
100100const char * opt ;
101+ int var ;
101102
102- opt = GetConfigOption ("scheduler.max_workers" , true, false);
103+ opt = GetConfigOption ("schedule.max_workers" , true, false);
104+ /* opt = GetConfigOptionByName("schedule.max_workers", NULL); */
103105if (opt == NULL )
104106{
105107return 2 ;
106108}
107- return atoi (opt );
109+
110+ var = atoi (opt );
111+ /* pfree(opt); */
112+ return var ;
108113}
109114
110115char * get_scheduler_nodename (void )
111116{
112117const char * opt ;
113- opt = GetConfigOption ("scheduler .nodename" , true, false);
118+ opt = GetConfigOption ("schedule .nodename" , true, false);
114119
115120return _copy_string ((char * )(opt == NULL || strlen (opt )== 0 ?"master" :opt ));
116121}
@@ -140,13 +145,74 @@ scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname, dsm_
140145return ctx ;
141146}
142147
143- void refresh_scheduler_manager_context (scheduler_manager_ctx_t * ctx )
148+ int refresh_scheduler_manager_context (scheduler_manager_ctx_t * ctx )
144149{
145- /* TODO set new nodename , if changed kill all kids workers, change
146- max-workers resize slots
147- if less then was and all slots are buisy [ ??? ]
148- kill youngest (?)
149- */
150+ int rc = 0 ;
151+ int N ,i ,busy ;
152+ scheduler_manager_slot_t * * old ;
153+
154+ N = get_scheduler_maxworkers ();
155+ if (N != ctx -> slots_len )
156+ {
157+ elog (LOG ,"Change available workers number %d => %d" ,ctx -> slots_len ,N );
158+ }
159+
160+ if (N > ctx -> slots_len )
161+ {
162+ pgstat_report_activity (STATE_RUNNING ,"extend the number of workers" );
163+
164+ old = ctx -> slots ;
165+ ctx -> slots = worker_alloc (sizeof (scheduler_manager_slot_t * )* N );
166+ for (i = 0 ;i < N ;i ++ )
167+ {
168+ ctx -> slots [i ]= NULL ;
169+ }
170+ for (i = 0 ;i < ctx -> slots_len ;i ++ )
171+ {
172+ ctx -> slots [i ]= old [i ];
173+ }
174+ pfree (old );
175+ ctx -> free_slots += (N - ctx -> slots_len );
176+ ctx -> slots_len = N ;
177+ }
178+ else if (N < ctx -> slots_len )
179+ {
180+ pgstat_report_activity (STATE_RUNNING ,"shrink the number of workers" );
181+ busy = ctx -> slots_len - ctx -> free_slots ;
182+ if (N >=busy )
183+ {
184+ ctx -> slots = repalloc (ctx -> slots ,sizeof (scheduler_manager_slot_t * )* N );
185+ ctx -> slots_len = N ;
186+ ctx -> free_slots = N - busy ;
187+ }
188+ else
189+ {
190+ pgstat_report_activity (STATE_RUNNING ,"wait for some workers free slots" );
191+ while (!got_sigterm )
192+ {
193+ CHECK_FOR_INTERRUPTS ();
194+ scheduler_check_slots (ctx );
195+ busy = ctx -> slots_len - ctx -> free_slots ;
196+ if (N >=busy )
197+ {
198+ ctx -> slots = repalloc (ctx -> slots ,sizeof (scheduler_manager_slot_t * )* N );
199+ ctx -> slots_len = N ;
200+ ctx -> free_slots = N - busy ;
201+ break ;
202+ }
203+ if (rc )
204+ {
205+ if (rc & WL_POSTMASTER_DEATH )proc_exit (1 );
206+ if (got_sigterm || got_sighup )return 0 ;
207+ }
208+ rc = WaitLatch (MyLatch ,
209+ WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,500L );
210+ ResetLatch (MyLatch );
211+ }
212+ }
213+ }
214+
215+ return 1 ;
150216}
151217
152218void destroy_scheduler_manager_context (scheduler_manager_ctx_t * ctx )
@@ -218,7 +284,6 @@ scheduler_task_t *scheduler_get_active_tasks(scheduler_manager_ctx_t *ctx, int *
218284tupdesc = SPI_tuptable -> tupdesc ;
219285
220286tasks = worker_alloc (sizeof (scheduler_task_t )* processed );
221- elog (LOG ,"Found %d tasks" ,processed );
222287
223288for (i = 0 ;i < processed ;i ++ )
224289{
@@ -496,7 +561,6 @@ TimestampTz *scheduler_calc_next_task_time(scheduler_task_t *task, TimestampTz s
496561curr += SECS_PER_MINUTE ;
497562#endif
498563}
499- elog (LOG ,"made: %d" ,* ntimes );
500564for (i = 0 ;i < 5 ;i ++ )destroy_bit_array (& cron [i ],0 );
501565if (* ntimes == 0 )
502566{
@@ -1144,6 +1208,14 @@ void clean_at_table(scheduler_manager_ctx_t *ctx)
11441208STOP_SPI_SNAP ();
11451209}
11461210
1211+ void set_slots_stat_report (scheduler_manager_ctx_t * ctx )
1212+ {
1213+ char state [128 ];
1214+ snprintf (state ,128 ,"slots busy: %d, free: %d" ,
1215+ ctx -> slots_len - ctx -> free_slots ,ctx -> free_slots );
1216+ pgstat_report_activity (STATE_RUNNING ,state );
1217+ }
1218+
11471219void manager_worker_main (Datum arg )
11481220{
11491221char * database ;
@@ -1196,7 +1268,6 @@ void manager_worker_main(Datum arg)
11961268dsm_detach (seg );
11971269proc_exit (0 );
11981270}
1199- elog (LOG ,"ON" );
12001271SetCurrentStatementStartTimestamp ();
12011272pgstat_report_activity (STATE_RUNNING ,"initialize." );
12021273
@@ -1209,7 +1280,8 @@ void manager_worker_main(Datum arg)
12091280init_worker_mem_ctx ("WorkerMemoryContext" );
12101281ctx = initialize_scheduler_manager_context (database ,seg );
12111282clean_at_table (ctx );
1212- elog (LOG ,"Start main loop" );
1283+ set_slots_stat_report (ctx );
1284+
12131285while (!got_sigterm )
12141286{
12151287if (rc )
@@ -1220,20 +1292,22 @@ void manager_worker_main(Datum arg)
12201292got_sighup = false;
12211293ProcessConfigFile (PGC_SIGHUP );
12221294refresh_scheduler_manager_context (ctx );
1295+ set_slots_stat_report (ctx );
12231296}
12241297if (!got_sighup && !got_sigterm )
12251298{
12261299if (rc & WL_LATCH_SET )
12271300{
12281301scheduler_check_slots (ctx );
1302+ set_slots_stat_report (ctx );
12291303}
12301304else if (rc & WL_TIMEOUT )
12311305{
12321306scheduler_make_at_record (ctx );
12331307scheduler_vanish_expired_jobs (ctx );
12341308scheduler_start_jobs (ctx );
12351309scheduler_check_slots (ctx );
1236- pgstat_report_activity ( STATE_IDLE , "" );
1310+ set_slots_stat_report ( ctx );
12371311}
12381312}
12391313}