15
15
#include "catalog/pg_authid.h"
16
16
#include "utils/syscache.h"
17
17
#include "access/htup_details.h"
18
+ #include "utils/timeout.h"
18
19
19
20
#include "pgstat.h"
20
21
#include "fmgr.h"
@@ -316,6 +317,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
316
317
}
317
318
destroy_job (job ,1 );
318
319
320
+ SetSessionAuthorization (BOOTSTRAP_SUPERUSERID , true);
321
+ ResetAllOptions ();
322
+
319
323
return 1 ;
320
324
}
321
325
@@ -607,15 +611,13 @@ resubmit(PG_FUNCTION_ARGS)
607
611
608
612
void at_executor_worker_main (Datum arg )
609
613
{
610
- schd_executor_share_t * shared ;
614
+ schd_executor_share_state_t * shared ;
611
615
dsm_segment * seg ;
612
616
int result ;
613
617
int rc = 0 ;
614
618
schd_executor_status_t status ;
615
619
bool lets_sleep = false;
616
620
/* PGPROC *parent; */
617
- double begin ,elapsed ;
618
- struct timeval tv ;
619
621
620
622
CurrentResourceOwner = ResourceOwnerCreate (NULL ,"pgpro_scheduler_executor" );
621
623
seg = dsm_attach (DatumGetInt32 (arg ));
@@ -632,6 +634,7 @@ void at_executor_worker_main(Datum arg)
632
634
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
633
635
errmsg ("executor corrupted dynamic shared memory segment" )));
634
636
}
637
+ shared -> start_at = GetCurrentTimestamp ();
635
638
636
639
SetConfigOption ("application_name" ,"pgp-s at executor" ,PGC_USERSET ,PGC_S_SESSION );
637
640
pgstat_report_activity (STATE_RUNNING ,"initialize" );
@@ -644,18 +647,14 @@ void at_executor_worker_main(Datum arg)
644
647
645
648
while (1 )
646
649
{
650
+ if (shared -> stop_worker )break ;
647
651
if (got_sighup )
648
652
{
649
653
got_sighup = false;
650
654
ProcessConfigFile (PGC_SIGHUP );
651
655
}
652
656
CHECK_FOR_INTERRUPTS ();
653
- gettimeofday (& tv ,NULL );
654
- begin = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 ;
655
657
result = process_one_job (shared ,& status );
656
- gettimeofday (& tv ,NULL );
657
- elapsed = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 - begin ;
658
- elog (LOG ,"job done %d = %f" ,result ,elapsed );
659
658
660
659
if (result == 0 )
661
660
{
@@ -680,42 +679,35 @@ void at_executor_worker_main(Datum arg)
680
679
}
681
680
}
682
681
682
+ if (shared -> stop_worker )
683
+ {
684
+ elog (LOG ,"at worker stopped by parent signal" );
685
+ }
686
+
683
687
delete_worker_mem_ctx ();
684
688
dsm_detach (seg );
685
689
proc_exit (0 );
686
690
}
687
691
688
- int process_one_job (schd_executor_share_t * shared ,schd_executor_status_t * status )
692
+ int process_one_job (schd_executor_share_state_t * shared ,schd_executor_status_t * status )
689
693
{
690
694
char * error = NULL ;
691
695
job_t * job ;
692
696
int ret ;
693
697
char buff [512 ];
694
- double begin ,elapsed ;
695
- struct timeval tv ;
696
698
697
699
* status = shared -> status = SchdExecutorWork ;
698
- shared -> message [0 ]= 0 ;
699
700
700
701
pgstat_report_activity (STATE_RUNNING ,"initialize job" );
701
702
START_SPI_SNAP ();
702
703
703
- gettimeofday (& tv ,NULL );
704
- begin = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 ;
705
-
706
704
job = get_next_at_job_with_lock (shared -> nodename ,& error );
707
705
708
- gettimeofday (& tv ,NULL );
709
- elapsed = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 - begin ;
710
- elog (LOG ,"got jobs = %f" ,elapsed );
711
-
712
706
if (!job )
713
707
{
714
708
if (error )
715
709
{
716
710
shared -> status = SchdExecutorIdling ;
717
- snprintf (shared -> message ,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
718
- "Cannot get job: %s" ,error );
719
711
elog (LOG ,"AT EXECUTOR: ERROR: %s" ,error );
720
712
pfree (error );
721
713
ABORT_SPI_SNAP ();
@@ -734,15 +726,11 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
734
726
if (error )
735
727
{
736
728
set_at_job_done (job ,error ,0 );
737
- snprintf (shared -> message ,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
738
- "Cannot set session auth: %s" ,error );
739
729
pfree (error );
740
730
}
741
731
else
742
732
{
743
733
set_at_job_done (job ,"Unknown set session auth error" ,0 );
744
- snprintf (shared -> message ,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
745
- "Cannot set session auth: unknown error" );
746
734
}
747
735
shared -> status = SchdExecutorIdling ;
748
736
STOP_SPI_SNAP ();
@@ -761,6 +749,7 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
761
749
sprintf (buff ,"%lld" ,job -> timelimit * 1000 );
762
750
#endif
763
751
SetConfigOption ("statement_timeout" ,buff ,PGC_SUSET ,PGC_S_OVERRIDE );
752
+ enable_timeout_after (STATEMENT_TIMEOUT ,StatementTimeout );
764
753
}
765
754
766
755
if (job -> sql_params_n > 0 )
@@ -771,6 +760,10 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
771
760
{
772
761
ret = execute_spi (job -> dosql [0 ],& error );
773
762
}
763
+ if (job -> timelimit )
764
+ {
765
+ disable_timeout (STATEMENT_TIMEOUT , false);
766
+ }
774
767
ResetAllOptions ();
775
768
SetConfigOption ("enable_seqscan" ,"off" ,PGC_USERSET ,PGC_S_SESSION );
776
769
SetSessionAuthorization (BOOTSTRAP_SUPERUSERID , true);