@@ -115,7 +115,7 @@ char *get_scheduler_nodename(void)
115115return _copy_string ((char * )(opt == NULL || strlen (opt )== 0 ?"master" :opt ));
116116}
117117
118- scheduler_manager_ctx_t * initialize_scheduler_manager_context (char * dbname )
118+ scheduler_manager_ctx_t * initialize_scheduler_manager_context (char * dbname , dsm_segment * seg )
119119{
120120int i ;
121121scheduler_manager_ctx_t * ctx ;
@@ -126,6 +126,8 @@ scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname)
126126ctx -> nodename = get_scheduler_nodename ();
127127ctx -> database = _copy_string (dbname );
128128
129+ ctx -> seg = seg ;
130+
129131ctx -> slots = worker_alloc (sizeof (scheduler_manager_slot_t * )* ctx -> slots_len );
130132for (i = 0 ;i < ctx -> slots_len ;i ++ )
131133{
@@ -169,10 +171,22 @@ void destroy_scheduler_manager_context(scheduler_manager_ctx_t *ctx)
169171pfree (ctx );
170172}
171173
172- void scheduler_manager_stop (scheduler_manager_ctx_t * ctx )
174+ int scheduler_manager_stop (scheduler_manager_ctx_t * ctx )
173175{
176+ int i ;
177+ int onwork ;
178+
179+ onwork = ctx -> slots_len - ctx -> free_slots ;
180+ if (onwork == 0 )return 0 ;
181+
174182pgstat_report_activity (STATE_RUNNING ,"stop executors" );
175- /* TODO stop worker but before stop all started kid workers */
183+ for (i = 0 ;i < onwork ;i ++ )
184+ {
185+ elog (LOG ,"Schedule manager: terminate bgworker %d" ,
186+ ctx -> slots [i ]-> pid );
187+ TerminateBackgroundWorker (ctx -> slots [i ]-> handler );
188+ }
189+ return onwork ;
176190}
177191
178192scheduler_task_t * scheduler_get_active_tasks (scheduler_manager_ctx_t * ctx ,int * nt )
@@ -588,6 +602,8 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
588602shm_data -> start_at = item -> job -> start_at ;
589603shm_data -> message [0 ]= 0 ;
590604shm_data -> next_time = 0 ;
605+ shm_data -> set_invalid = false;
606+ shm_data -> set_invalid_reason [0 ]= 0 ;
591607
592608worker .bgw_flags = BGWORKER_SHMEM_ACCESS |
593609BGWORKER_BACKEND_DATABASE_CONNECTION ;
@@ -808,12 +824,17 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
808824}
809825else if (toremove [i ].reason == RmDone )
810826{
827+ shm_data = dsm_segment_address (item -> shared );
811828job_status = true;
829+ if (shm_data -> message [0 ]!= 0 )
830+ {
831+ set_job_error (item -> job ,"%s" ,shm_data -> message );
832+ }
812833}
813834else if (toremove [i ].reason == RmError )
814835{
815836shm_data = dsm_segment_address (item -> shared );
816- if (strlen ( shm_data -> message ) > 0 )
837+ if (shm_data -> message [ 0 ] != 0 )
817838{
818839set_job_error (item -> job ,"%s" ,shm_data -> message );
819840}
@@ -830,19 +851,23 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
830851if (removeJob )
831852{
832853START_SPI_SNAP ();
854+ shm_data = dsm_segment_address (item -> shared );
855+
856+ if (shm_data -> set_invalid )
857+ {
858+ mark_job_broken (ctx ,item -> job -> cron_id ,shm_data -> set_invalid_reason );
859+ }
833860if (item -> job -> next_time_statement )
834861{
835- shm_data = dsm_segment_address (item -> shared );
836862if (shm_data -> next_time > 0 )
837863{
838864next_time = _round_timestamp_to_minute (shm_data -> next_time );
839865next_time_str = make_date_from_timestamp (next_time );
840866if (insert_at_record (ctx -> nodename ,item -> job -> cron_id ,next_time ,0 ,& error )< 0 )
841867{
842- elog (ERROR ,"Cannot insert next time at record: %s" ,
843- error ?error :"unknown error" );
868+ manager_fatal_error (ctx ,0 ,"Cannot insert next time at record: %s" ,error ?error :"unknown error" );
844869}
845- update_cron_texttime (item -> job -> cron_id ,next_time );
870+ update_cron_texttime (ctx , item -> job -> cron_id ,next_time );
846871if (!item -> job -> error )
847872{
848873set_job_error (item -> job ,"set next exec time: %s" ,next_time_str );
@@ -868,7 +893,25 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
868893return 1 ;
869894}
870895
871- int update_cron_texttime (int cron_id ,TimestampTz next )
896+ int mark_job_broken (scheduler_manager_ctx_t * ctx ,int cron_id ,char * reason )
897+ {
898+ Oid types [2 ]= {INT4OID ,TEXTOID };
899+ Datum values [2 ];
900+ char * error ;
901+ char * sql = "update schedule.cron set reason = $2, broken = true where id = $1" ;
902+ int ret ;
903+
904+ values [0 ]= Int32GetDatum (cron_id );
905+ values [1 ]= CStringGetTextDatum (reason );
906+ ret = execute_spi_sql_with_args (sql ,2 ,types ,values ,NULL ,& error );
907+ if (ret < 0 )
908+ {
909+ manager_fatal_error (ctx ,0 ,"Cannot set cron %d broken: %s" ,cron_id ,error );
910+ }
911+ return ret ;
912+ }
913+
914+ int update_cron_texttime (scheduler_manager_ctx_t * ctx ,int cron_id ,TimestampTz next )
872915{
873916Oid types [2 ]= {INT4OID ,TIMESTAMPTZOID };
874917Datum values [2 ];
@@ -889,7 +932,7 @@ int update_cron_texttime(int cron_id, TimestampTz next)
889932ret = execute_spi_sql_with_args (sql ,2 ,types ,values ,nulls ,& error );
890933if (ret < 0 )
891934{
892- elog ( ERROR ,"Cannot update cron %d next time: %s" ,cron_id ,error );
935+ manager_fatal_error ( ctx , 0 ,"Cannot update cron %d next time: %s" ,cron_id ,error );
893936}
894937
895938return ret ;
@@ -1025,6 +1068,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10251068{
10261069n_exec_dates = 0 ;
10271070ntimes = 0 ;
1071+ realloced = false;
10281072
10291073next_times = scheduler_calc_next_task_time (& (tasks [i ]),
10301074GetCurrentTimestamp (),timestamp_add_seconds (0 ,600 ),
@@ -1035,12 +1079,13 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10351079{
10361080date1 = make_date_from_timestamp (start );
10371081date2 = make_date_from_timestamp (stop );
1082+
10381083
10391084for (j = 0 ;j < n_exec_dates ;j ++ )
10401085{
10411086r1 = strcmp (date1 ,exec_dates [j ]);
10421087r2 = strcmp (exec_dates [j ],date2 );
1043- if (( r1 == 0 || r1 == -1 ) && ( r2 == 0 || r2 == -1 ) )
1088+ if (r1 <= 0 && r2 <= 0 )
10441089{
10451090if (!realloced )
10461091{
@@ -1069,7 +1114,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10691114{
10701115if (insert_at_record (ctx -> nodename ,tasks [i ].id ,next_times [j ],tasks [i ].postpone ,& error )< 0 )
10711116{
1072- elog ( ERROR ,"Cannot insert AT task: %s" ,error ?error :"unknown error" );
1117+ manager_fatal_error ( ctx , 0 ,"Cannot insert AT task: %s" ,error ?error :"unknown error" );
10731118}
10741119}
10751120pfree (next_times );
@@ -1083,18 +1128,18 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
10831128return ntasks ;
10841129}
10851130
1086- void clean_at_table (void )
1131+ void clean_at_table (scheduler_manager_ctx_t * ctx )
10871132{
10881133char * error = NULL ;
10891134
10901135START_SPI_SNAP ();
10911136if (execute_spi ("truncate schedule.at" ,& error )< 0 )
10921137{
1093- elog ( ERROR ,"Cannot clean 'at' table: %s" ,error );
1138+ manager_fatal_error ( ctx , 0 ,"Cannot clean 'at' table: %s" ,error );
10941139}
10951140if (execute_spi ("update schedule.cron set _next_exec_time = NULL where _next_exec_time is not NULL" ,& error )< 0 )
10961141{
1097- elog ( ERROR ,"Cannot clean cron _next time: %s" ,error );
1142+ manager_fatal_error ( ctx , 0 ,"Cannot clean cron _next time: %s" ,error );
10981143}
10991144STOP_SPI_SNAP ();
11001145}
@@ -1120,6 +1165,7 @@ void manager_worker_main(Datum arg)
11201165
11211166if (shared -> status != SchdManagerInit && !(shared -> setbyparent ))
11221167{
1168+ dsm_detach (seg );
11231169ereport (ERROR ,
11241170(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
11251171errmsg ("corrupted dynamic shared memory segment" )));
@@ -1150,6 +1196,7 @@ void manager_worker_main(Datum arg)
11501196dsm_detach (seg );
11511197proc_exit (0 );
11521198}
1199+ elog (LOG ,"ON" );
11531200SetCurrentStatementStartTimestamp ();
11541201pgstat_report_activity (STATE_RUNNING ,"initialize." );
11551202
@@ -1160,9 +1207,9 @@ void manager_worker_main(Datum arg)
11601207pgstat_report_activity (STATE_RUNNING ,"initialize context" );
11611208changeChildBgwState (shared ,SchdManagerConnected );
11621209init_worker_mem_ctx ("WorkerMemoryContext" );
1163- ctx = initialize_scheduler_manager_context (database );
1164- clean_at_table ();
1165-
1210+ ctx = initialize_scheduler_manager_context (database , seg );
1211+ clean_at_table (ctx );
1212+ elog ( LOG , "Start main loop" );
11661213while (!got_sigterm )
11671214{
11681215if (rc )
@@ -1194,12 +1241,35 @@ void manager_worker_main(Datum arg)
11941241WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,1000L );
11951242ResetLatch (MyLatch );
11961243}
1244+ scheduler_manager_stop (ctx );
11971245delete_worker_mem_ctx ();
1198- /* destroy_scheduler_manager_context(ctx); - no need any more */
11991246changeChildBgwState (shared ,SchdManagerDie );
12001247pfree (database );
12011248dsm_detach (seg );
12021249proc_exit (0 );
12031250}
12041251
1252+ void manager_fatal_error (scheduler_manager_ctx_t * ctx ,int ecode ,char * message , ...)
1253+ {
1254+ va_list arglist ;
1255+ char buf [1024 ];
1256+
1257+ scheduler_manager_stop (ctx );
1258+ changeChildBgwState ((schd_manager_share_t * )(dsm_segment_address (ctx -> seg )),SchdManagerDie );
1259+ dsm_detach (ctx -> seg );
1260+
1261+ va_start (arglist ,message );
1262+ vsnprintf (buf ,1024 ,message ,arglist );
1263+ va_end (arglist );
1264+
1265+
1266+ delete_worker_mem_ctx ();
1267+ if (ecode == 0 )
1268+ {
1269+ ecode = ERRCODE_INTERNAL_ERROR ;
1270+ }
1271+
1272+ ereport (ERROR , (errcode (ecode ),errmsg ("%s" ,buf )));
1273+ }
1274+
12051275