@@ -115,7 +115,7 @@ char *get_scheduler_nodename(void)
115
115
return _copy_string ((char * )(opt == NULL || strlen (opt )== 0 ?"master" :opt ));
116
116
}
117
117
118
- scheduler_manager_ctx_t * initialize_scheduler_manager_context (char * dbname )
118
+ scheduler_manager_ctx_t * initialize_scheduler_manager_context (char * dbname , dsm_segment * seg )
119
119
{
120
120
int i ;
121
121
scheduler_manager_ctx_t * ctx ;
@@ -126,6 +126,8 @@ scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname)
126
126
ctx -> nodename = get_scheduler_nodename ();
127
127
ctx -> database = _copy_string (dbname );
128
128
129
+ ctx -> seg = seg ;
130
+
129
131
ctx -> slots = worker_alloc (sizeof (scheduler_manager_slot_t * )* ctx -> slots_len );
130
132
for (i = 0 ;i < ctx -> slots_len ;i ++ )
131
133
{
@@ -169,10 +171,22 @@ void destroy_scheduler_manager_context(scheduler_manager_ctx_t *ctx)
169
171
pfree (ctx );
170
172
}
171
173
172
- void scheduler_manager_stop (scheduler_manager_ctx_t * ctx )
174
+ int scheduler_manager_stop (scheduler_manager_ctx_t * ctx )
173
175
{
176
+ int i ;
177
+ int onwork ;
178
+
179
+ onwork = ctx -> slots_len - ctx -> free_slots ;
180
+ if (onwork == 0 )return 0 ;
181
+
174
182
pgstat_report_activity (STATE_RUNNING ,"stop executors" );
175
- /* TODO stop worker but before stop all started kid workers */
183
+ for (i = 0 ;i < onwork ;i ++ )
184
+ {
185
+ elog (LOG ,"Schedule manager: terminate bgworker %d" ,
186
+ ctx -> slots [i ]-> pid );
187
+ TerminateBackgroundWorker (ctx -> slots [i ]-> handler );
188
+ }
189
+ return onwork ;
176
190
}
177
191
178
192
scheduler_task_t * scheduler_get_active_tasks (scheduler_manager_ctx_t * ctx ,int * nt )
@@ -588,6 +602,8 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
588
602
shm_data -> start_at = item -> job -> start_at ;
589
603
shm_data -> message [0 ]= 0 ;
590
604
shm_data -> next_time = 0 ;
605
+ shm_data -> set_invalid = false;
606
+ shm_data -> set_invalid_reason [0 ]= 0 ;
591
607
592
608
worker .bgw_flags = BGWORKER_SHMEM_ACCESS |
593
609
BGWORKER_BACKEND_DATABASE_CONNECTION ;
@@ -808,12 +824,17 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
808
824
}
809
825
else if (toremove [i ].reason == RmDone )
810
826
{
827
+ shm_data = dsm_segment_address (item -> shared );
811
828
job_status = true;
829
+ if (shm_data -> message [0 ]!= 0 )
830
+ {
831
+ set_job_error (item -> job ,"%s" ,shm_data -> message );
832
+ }
812
833
}
813
834
else if (toremove [i ].reason == RmError )
814
835
{
815
836
shm_data = dsm_segment_address (item -> shared );
816
- if (strlen ( shm_data -> message ) > 0 )
837
+ if (shm_data -> message [ 0 ] != 0 )
817
838
{
818
839
set_job_error (item -> job ,"%s" ,shm_data -> message );
819
840
}
@@ -830,19 +851,23 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
830
851
if (removeJob )
831
852
{
832
853
START_SPI_SNAP ();
854
+ shm_data = dsm_segment_address (item -> shared );
855
+
856
+ if (shm_data -> set_invalid )
857
+ {
858
+ mark_job_broken (ctx ,item -> job -> cron_id ,shm_data -> set_invalid_reason );
859
+ }
833
860
if (item -> job -> next_time_statement )
834
861
{
835
- shm_data = dsm_segment_address (item -> shared );
836
862
if (shm_data -> next_time > 0 )
837
863
{
838
864
next_time = _round_timestamp_to_minute (shm_data -> next_time );
839
865
next_time_str = make_date_from_timestamp (next_time );
840
866
if (insert_at_record (ctx -> nodename ,item -> job -> cron_id ,next_time ,0 ,& error )< 0 )
841
867
{
842
- elog (ERROR ,"Cannot insert next time at record: %s" ,
843
- error ?error :"unknown error" );
868
+ manager_fatal_error (ctx ,0 ,"Cannot insert next time at record: %s" ,error ?error :"unknown error" );
844
869
}
845
- update_cron_texttime (item -> job -> cron_id ,next_time );
870
+ update_cron_texttime (ctx , item -> job -> cron_id ,next_time );
846
871
if (!item -> job -> error )
847
872
{
848
873
set_job_error (item -> job ,"set next exec time: %s" ,next_time_str );
@@ -868,7 +893,25 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx)
868
893
return 1 ;
869
894
}
870
895
871
- int update_cron_texttime (int cron_id ,TimestampTz next )
896
+ int mark_job_broken (scheduler_manager_ctx_t * ctx ,int cron_id ,char * reason )
897
+ {
898
+ Oid types [2 ]= {INT4OID ,TEXTOID };
899
+ Datum values [2 ];
900
+ char * error ;
901
+ char * sql = "update schedule.cron set reason = $2, broken = true where id = $1" ;
902
+ int ret ;
903
+
904
+ values [0 ]= Int32GetDatum (cron_id );
905
+ values [1 ]= CStringGetTextDatum (reason );
906
+ ret = execute_spi_sql_with_args (sql ,2 ,types ,values ,NULL ,& error );
907
+ if (ret < 0 )
908
+ {
909
+ manager_fatal_error (ctx ,0 ,"Cannot set cron %d broken: %s" ,cron_id ,error );
910
+ }
911
+ return ret ;
912
+ }
913
+
914
+ int update_cron_texttime (scheduler_manager_ctx_t * ctx ,int cron_id ,TimestampTz next )
872
915
{
873
916
Oid types [2 ]= {INT4OID ,TIMESTAMPTZOID };
874
917
Datum values [2 ];
@@ -889,7 +932,7 @@ int update_cron_texttime(int cron_id, TimestampTz next)
889
932
ret = execute_spi_sql_with_args (sql ,2 ,types ,values ,nulls ,& error );
890
933
if (ret < 0 )
891
934
{
892
- elog ( ERROR ,"Cannot update cron %d next time: %s" ,cron_id ,error );
935
+ manager_fatal_error ( ctx , 0 ,"Cannot update cron %d next time: %s" ,cron_id ,error );
893
936
}
894
937
895
938
return ret ;
@@ -1025,6 +1068,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
1025
1068
{
1026
1069
n_exec_dates = 0 ;
1027
1070
ntimes = 0 ;
1071
+ realloced = false;
1028
1072
1029
1073
next_times = scheduler_calc_next_task_time (& (tasks [i ]),
1030
1074
GetCurrentTimestamp (),timestamp_add_seconds (0 ,600 ),
@@ -1035,12 +1079,13 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
1035
1079
{
1036
1080
date1 = make_date_from_timestamp (start );
1037
1081
date2 = make_date_from_timestamp (stop );
1082
+
1038
1083
1039
1084
for (j = 0 ;j < n_exec_dates ;j ++ )
1040
1085
{
1041
1086
r1 = strcmp (date1 ,exec_dates [j ]);
1042
1087
r2 = strcmp (exec_dates [j ],date2 );
1043
- if (( r1 == 0 || r1 == -1 ) && ( r2 == 0 || r2 == -1 ) )
1088
+ if (r1 <= 0 && r2 <= 0 )
1044
1089
{
1045
1090
if (!realloced )
1046
1091
{
@@ -1069,7 +1114,7 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
1069
1114
{
1070
1115
if (insert_at_record (ctx -> nodename ,tasks [i ].id ,next_times [j ],tasks [i ].postpone ,& error )< 0 )
1071
1116
{
1072
- elog ( ERROR ,"Cannot insert AT task: %s" ,error ?error :"unknown error" );
1117
+ manager_fatal_error ( ctx , 0 ,"Cannot insert AT task: %s" ,error ?error :"unknown error" );
1073
1118
}
1074
1119
}
1075
1120
pfree (next_times );
@@ -1083,18 +1128,18 @@ int scheduler_make_at_record(scheduler_manager_ctx_t *ctx)
1083
1128
return ntasks ;
1084
1129
}
1085
1130
1086
- void clean_at_table (void )
1131
+ void clean_at_table (scheduler_manager_ctx_t * ctx )
1087
1132
{
1088
1133
char * error = NULL ;
1089
1134
1090
1135
START_SPI_SNAP ();
1091
1136
if (execute_spi ("truncate schedule.at" ,& error )< 0 )
1092
1137
{
1093
- elog ( ERROR ,"Cannot clean 'at' table: %s" ,error );
1138
+ manager_fatal_error ( ctx , 0 ,"Cannot clean 'at' table: %s" ,error );
1094
1139
}
1095
1140
if (execute_spi ("update schedule.cron set _next_exec_time = NULL where _next_exec_time is not NULL" ,& error )< 0 )
1096
1141
{
1097
- elog ( ERROR ,"Cannot clean cron _next time: %s" ,error );
1142
+ manager_fatal_error ( ctx , 0 ,"Cannot clean cron _next time: %s" ,error );
1098
1143
}
1099
1144
STOP_SPI_SNAP ();
1100
1145
}
@@ -1120,6 +1165,7 @@ void manager_worker_main(Datum arg)
1120
1165
1121
1166
if (shared -> status != SchdManagerInit && !(shared -> setbyparent ))
1122
1167
{
1168
+ dsm_detach (seg );
1123
1169
ereport (ERROR ,
1124
1170
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1125
1171
errmsg ("corrupted dynamic shared memory segment" )));
@@ -1150,6 +1196,7 @@ void manager_worker_main(Datum arg)
1150
1196
dsm_detach (seg );
1151
1197
proc_exit (0 );
1152
1198
}
1199
+ elog (LOG ,"ON" );
1153
1200
SetCurrentStatementStartTimestamp ();
1154
1201
pgstat_report_activity (STATE_RUNNING ,"initialize." );
1155
1202
@@ -1160,9 +1207,9 @@ void manager_worker_main(Datum arg)
1160
1207
pgstat_report_activity (STATE_RUNNING ,"initialize context" );
1161
1208
changeChildBgwState (shared ,SchdManagerConnected );
1162
1209
init_worker_mem_ctx ("WorkerMemoryContext" );
1163
- ctx = initialize_scheduler_manager_context (database );
1164
- clean_at_table ();
1165
-
1210
+ ctx = initialize_scheduler_manager_context (database , seg );
1211
+ clean_at_table (ctx );
1212
+ elog ( LOG , "Start main loop" );
1166
1213
while (!got_sigterm )
1167
1214
{
1168
1215
if (rc )
@@ -1194,12 +1241,35 @@ void manager_worker_main(Datum arg)
1194
1241
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,1000L );
1195
1242
ResetLatch (MyLatch );
1196
1243
}
1244
+ scheduler_manager_stop (ctx );
1197
1245
delete_worker_mem_ctx ();
1198
- /* destroy_scheduler_manager_context(ctx); - no need any more */
1199
1246
changeChildBgwState (shared ,SchdManagerDie );
1200
1247
pfree (database );
1201
1248
dsm_detach (seg );
1202
1249
proc_exit (0 );
1203
1250
}
1204
1251
1252
+ void manager_fatal_error (scheduler_manager_ctx_t * ctx ,int ecode ,char * message , ...)
1253
+ {
1254
+ va_list arglist ;
1255
+ char buf [1024 ];
1256
+
1257
+ scheduler_manager_stop (ctx );
1258
+ changeChildBgwState ((schd_manager_share_t * )(dsm_segment_address (ctx -> seg )),SchdManagerDie );
1259
+ dsm_detach (ctx -> seg );
1260
+
1261
+ va_start (arglist ,message );
1262
+ vsnprintf (buf ,1024 ,message ,arglist );
1263
+ va_end (arglist );
1264
+
1265
+
1266
+ delete_worker_mem_ctx ();
1267
+ if (ecode == 0 )
1268
+ {
1269
+ ecode = ERRCODE_INTERNAL_ERROR ;
1270
+ }
1271
+
1272
+ ereport (ERROR , (errcode (ecode ),errmsg ("%s" ,buf )));
1273
+ }
1274
+
1205
1275