@@ -139,6 +139,13 @@ void executor_worker_main(Datum arg)
139139}
140140else if (result < 0 )
141141{
142+ if (result == -100 )
143+ {
144+ snprintf (shared -> message ,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
145+ "Cannot allocate memory" );
146+ shared -> worker_exit = true;
147+ shared -> status = SchdExecutorError ;
148+ }
142149delete_worker_mem_ctx ();
143150dsm_detach (seg );
144151proc_exit (0 );
@@ -159,9 +166,11 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
159166{
160167executor_error_t EE ;
161168char * error = NULL ;
162- int i ;
169+ int i , ret ;
163170job_t * job ;
164171spi_response_t * r ;
172+ MemoryContext old ,mem ;
173+ char buffer [1024 ];
165174
166175EE .n = 0 ;
167176EE .errors = NULL ;
@@ -174,6 +183,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
174183return 0 ;
175184}
176185
186+ mem = init_mem_ctx ("executor" );
187+ old = MemoryContextSwitchTo (mem );
188+
177189* status = shared -> status = SchdExecutorWork ;
178190shared -> message [0 ]= 0 ;
179191
@@ -187,6 +199,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
187199shared -> worker_exit = true;
188200* status = shared -> status = SchdExecutorError ;
189201
202+ MemoryContextSwitchTo (old );
203+ MemoryContextDelete (mem );
204+
190205return -1 ;
191206}
192207current_job_id = job -> cron_id ;
@@ -207,6 +222,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
207222}
208223* status = shared -> worker_exit = true;
209224shared -> status = SchdExecutorError ;
225+ MemoryContextSwitchTo (old );
226+ MemoryContextDelete (mem );
210227return -2 ;
211228}
212229
@@ -230,30 +247,35 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
230247}
231248if (job -> type == AtJob && i == 0 && job -> sql_params_n > 0 )
232249{
233- r = execute_spi_params_prepared (job -> dosql [i ],job -> sql_params_n ,job -> sql_params );
250+ r = execute_spi_params_prepared (mem , job -> dosql [i ],job -> sql_params_n ,job -> sql_params );
234251}
235252else
236253{
237- r = execute_spi (job -> dosql [i ]);
254+ r = execute_spi (mem , job -> dosql [i ]);
238255}
256+ snprintf (buffer ,1024 ,"finalize: %s" ,job -> dosql [i ]);
257+ if (!r )return -100 ;/* cannot allocate memory */
258+ pgstat_report_activity (STATE_RUNNING ,buffer );
239259if (r -> retval < 0 )
240260{
241261/* success = false; */
242262* status = SchdExecutorError ;
243263if (r -> error )
244264{
245- push_executor_error (& EE ,"error in command #%d: %s" ,
265+ ret = push_executor_error (& EE ,"error in command #%d: %s" ,
246266i + 1 ,r -> error );
247267}
248268else
249269{
250- push_executor_error (& EE ,"error in command #%d: code: %d" ,
270+ ret = push_executor_error (& EE ,"error in command #%d: code: %d" ,
251271i + 1 ,r -> retval );
252272}
273+ if (ret < 0 )return -100 ;/* cannot alloc memory */
253274destroy_spi_data (r );
254275ABORT_SPI_SNAP ();
255276SetConfigOption ("schedule.transaction_state" ,"failure" ,PGC_INTERNAL ,PGC_S_SESSION );
256- executor_onrollback (job ,& EE );
277+ if (executor_onrollback (mem ,job ,& EE )== -14000 )
278+ return -100 ;/* cannot alloc memory */
257279
258280break ;
259281}
@@ -321,6 +343,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
321343
322344SetSessionAuthorization (BOOTSTRAP_SUPERUSERID , true);
323345ResetAllOptions ();
346+ MemoryContextSwitchTo (old );
347+ MemoryContextDelete (mem );
324348
325349return 1 ;
326350}
@@ -336,23 +360,24 @@ int set_session_authorization(char *username, char **error)
336360int rv ;
337361char * sql = "select oid, rolsuper from pg_catalog.pg_roles where rolname = $1" ;
338362char buff [1024 ];
363+ MemoryContext mem = CurrentMemoryContext ;
339364
340365values [0 ]= CStringGetTextDatum (username );
341366START_SPI_SNAP ();
342- r = execute_spi_sql_with_args (sql ,1 ,types ,values ,NULL );
367+ r = execute_spi_sql_with_args (mem , sql ,1 ,types ,values ,NULL );
343368
344369if (r -> retval < 0 )
345370{
346371rv = r -> retval ;
347- * error = _copy_string ( r -> error );
372+ * error = _mcopy_string ( mem , r -> error );
348373destroy_spi_data (r );
349374return rv ;
350375}
351376if (r -> n_rows == 0 )
352377{
353378STOP_SPI_SNAP ();
354379sprintf (buff ,"Cannot find user with name: %s" ,username );
355- * error = _copy_string ( buff );
380+ * error = _mcopy_string ( mem , buff );
356381destroy_spi_data (r );
357382
358383return -200 ;
@@ -415,7 +440,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
415440
416441START_SPI_SNAP ();
417442pgstat_report_activity (STATE_RUNNING ,"culc next time execution time" );
418- r = execute_spi (sql );
443+ r = execute_spi (CurrentMemoryContext , sql );
419444if (r -> retval < 0 )
420445{
421446if (r -> error )
@@ -460,7 +485,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
460485return ts ;
461486}
462487
463- int executor_onrollback (job_t * job ,executor_error_t * ee )
488+ int executor_onrollback (MemoryContext mem , job_t * job ,executor_error_t * ee )
464489{
465490int rv ;
466491spi_response_t * r ;
@@ -469,16 +494,18 @@ int executor_onrollback(job_t *job, executor_error_t *ee)
469494pgstat_report_activity (STATE_RUNNING ,"execure onrollback" );
470495
471496START_SPI_SNAP ();
472- r = execute_spi (job -> onrollback );
497+ r = execute_spi (mem , job -> onrollback );
473498if (r -> retval < 0 )
474499{
475500if (r -> error )
476501{
477- push_executor_error (ee ,"onrollback error: %s" ,r -> error );
502+ if (push_executor_error (ee ,"onrollback error: %s" ,r -> error )< 0 )
503+ return -14000 ;
478504}
479505else
480506{
481- push_executor_error (ee ,"onrollback error: unknown: %d" ,r -> retval );
507+ if (push_executor_error (ee ,"onrollback error: unknown: %d" ,r -> retval )< 0 )
508+ return -14000 ;
482509}
483510ABORT_SPI_SNAP ();
484511}
@@ -502,7 +529,7 @@ void set_pg_var(bool result, executor_error_t *ee)
502529
503530vals [0 ]= PointerGetDatum (cstring_to_text (result ?"success" :"failure" ));
504531
505- r = execute_spi_sql_with_args (sql ,1 ,argtypes ,vals ,NULL );
532+ r = execute_spi_sql_with_args (NULL , sql ,1 ,argtypes ,vals ,NULL );
506533if (r -> retval < 0 )
507534{
508535if (r -> error )
@@ -571,6 +598,10 @@ int push_executor_error(executor_error_t *e, char *fmt, ...)
571598{
572599e -> errors = repalloc (e -> errors ,sizeof (char * )* (e -> n + 1 ));
573600}
601+ if (e -> errors == NULL )
602+ {
603+ return -1 ;
604+ }
574605e -> errors [e -> n ]= worker_alloc (sizeof (char )* (len + 1 ));
575606memcpy (e -> errors [e -> n ],buf ,len + 1 );
576607
@@ -712,14 +743,16 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
712743int set_ret ;
713744char buff [512 ];
714745spi_response_t * r ;
746+ MemoryContext old ;
747+ MemoryContext mem = init_mem_ctx ("at job processor" );
748+ old = MemoryContextSwitchTo (mem );
715749
716750* status = shared -> status = SchdExecutorWork ;
717751
718752pgstat_report_activity (STATE_RUNNING ,"initialize at job" );
719753START_SPI_SNAP ();
720754
721- /* job = get_next_at_job_with_lock(shared->nodename, &error); */
722- job = get_at_job_for_process (shared -> nodename ,& error );
755+ job = get_at_job_for_process (mem ,shared -> nodename ,& error );
723756if (!job )
724757{
725758if (error )
@@ -765,7 +798,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
765798return -1 ;
766799}
767800STOP_SPI_SNAP ();
768- elog (LOG ,"JOB MOVED TO DONE" );
801+ MemoryContextSwitchTo (old );
802+ MemoryContextDelete (mem );
769803return 1 ;
770804}
771805
@@ -780,11 +814,11 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
780814
781815if (job -> sql_params_n > 0 )
782816{
783- r = execute_spi_params_prepared (job -> dosql [0 ],job -> sql_params_n ,job -> sql_params );
817+ r = execute_spi_params_prepared (mem , job -> dosql [0 ],job -> sql_params_n ,job -> sql_params );
784818}
785819else
786820{
787- r = execute_spi (job -> dosql [0 ]);
821+ r = execute_spi (mem , job -> dosql [0 ]);
788822}
789823if (job -> timelimit )
790824{
@@ -819,6 +853,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
819853if (set_ret > 0 )
820854{
821855STOP_SPI_SNAP ();
856+ MemoryContextSwitchTo (old );
857+ MemoryContextDelete (mem );
822858return 1 ;
823859}
824860if (set_error )
@@ -831,6 +867,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
831867elog (LOG ,"AT_EXECUTOR ERROR: set log: unknown error" );
832868}
833869ABORT_SPI_SNAP ();
870+ MemoryContextSwitchTo (old );
871+ MemoryContextDelete (mem );
834872
835873return -1 ;
836874}
@@ -846,7 +884,7 @@ Oid set_session_authorization_by_name(char *rolename, char **error)
846884if (!HeapTupleIsValid (roleTup ))
847885{
848886snprintf (buffer ,512 ,"There is no user name: %s" ,rolename );
849- * error = _copy_string ( buffer );
887+ * error = _mcopy_string ( NULL , buffer );
850888return InvalidOid ;
851889}
852890rform = (Form_pg_authid )GETSTRUCT (roleTup );