Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit17e9358

Browse files
author
Vladimir Ershov
committed
parallel workers
1 parent267ed06 commit17e9358

8 files changed

+129
-55
lines changed

‎pgpro_scheduler--2.0.sql

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,15 @@ CREATE TABLE at_jobs_submitted(
2828
CREATEINDEXON at_jobs_submitted(at,submit_time);
2929
CREATEINDEXON at_jobs_submitted (last_start_available, node);
3030

31-
-- CREATE TABLE at_jobs_process(
32-
-- start_time timestamp with time zone default now()
33-
-- ) INHERITS (at_jobs_submitted);
34-
3531
CREATETABLEat_jobs_process (like at_jobs_submitted including all);
3632
ALTERTABLE at_jobs_process ADD start_timetimestamp with time zone default now();
37-
38-
-- ALTER TABLE at_jobs_process ADD primary key (id);
3933
CREATEINDEXat_jobs_process_node_at_idxon at_jobs_process (node, at);
4034

41-
-- CREATE TABLE at_jobs_done(
42-
-- status boolean,
43-
-- reason text,
44-
-- done_time timestamp with time zone default now()
45-
-- ) INHERITS (at_jobs_process);
4635
CREATETABLEat_jobs_done (like at_jobs_process including all);
4736
ALTERTABLE at_jobs_done ADD statusboolean;
4837
ALTERTABLE at_jobs_done ADD reasontext;
4938
ALTERTABLE at_jobs_done ADD done_timetimestamp with time zone default now();
5039

51-
--ALTER TABLE at_jobs_done ADD primary key (id);
52-
-- CREATE INDEX at_jobs_done_node_at_idx on at_jobs_done (node, at);
5340

5441
CREATETABLEcron(
5542
idSERIALPRIMARY KEY,

‎src/pgpro_scheduler.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ char *scheduler_databases = NULL;
4646
char*scheduler_nodename=NULL;
4747
char*scheduler_transaction_state=NULL;
4848
intscheduler_max_workers=2;
49-
intscheduler_at_max_workers=2;
49+
intscheduler_max_parallel_workers=2;
5050
intscheduler_worker_job_limit=1;
5151
boolscheduler_service_enabled= false;
5252
char*scheduler_schema=NULL;
@@ -496,10 +496,10 @@ void _PG_init(void)
496496
NULL
497497
);
498498
DefineCustomIntVariable(
499-
"schedule.at_max_workers",
499+
"schedule.max_parallel_workers",
500500
"How much workers can serve at jobs on one database",
501501
NULL,
502-
&scheduler_at_max_workers,
502+
&scheduler_max_parallel_workers,
503503
2,
504504
1,
505505
100,

‎src/scheduler_executor.c

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include"catalog/pg_authid.h"
1616
#include"utils/syscache.h"
1717
#include"access/htup_details.h"
18+
#include"utils/timeout.h"
1819

1920
#include"pgstat.h"
2021
#include"fmgr.h"
@@ -316,6 +317,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
316317
}
317318
destroy_job(job,1);
318319

320+
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);
321+
ResetAllOptions();
322+
319323
return1;
320324
}
321325

@@ -607,15 +611,13 @@ resubmit(PG_FUNCTION_ARGS)
607611

608612
voidat_executor_worker_main(Datumarg)
609613
{
610-
schd_executor_share_t*shared;
614+
schd_executor_share_state_t*shared;
611615
dsm_segment*seg;
612616
intresult;
613617
intrc=0;
614618
schd_executor_status_tstatus;
615619
boollets_sleep= false;
616620
/* PGPROC *parent; */
617-
doublebegin,elapsed;
618-
structtimevaltv;
619621

620622
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler_executor");
621623
seg=dsm_attach(DatumGetInt32(arg));
@@ -632,6 +634,7 @@ void at_executor_worker_main(Datum arg)
632634
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
633635
errmsg("executor corrupted dynamic shared memory segment")));
634636
}
637+
shared->start_at=GetCurrentTimestamp();
635638

636639
SetConfigOption("application_name","pgp-s at executor",PGC_USERSET,PGC_S_SESSION);
637640
pgstat_report_activity(STATE_RUNNING,"initialize");
@@ -644,18 +647,14 @@ void at_executor_worker_main(Datum arg)
644647

645648
while(1)
646649
{
650+
if(shared->stop_worker)break;
647651
if(got_sighup)
648652
{
649653
got_sighup= false;
650654
ProcessConfigFile(PGC_SIGHUP);
651655
}
652656
CHECK_FOR_INTERRUPTS();
653-
gettimeofday(&tv,NULL);
654-
begin= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000;
655657
result=process_one_job(shared,&status);
656-
gettimeofday(&tv,NULL);
657-
elapsed= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000-begin;
658-
elog(LOG,"job done %d = %f",result,elapsed);
659658

660659
if(result==0)
661660
{
@@ -680,42 +679,35 @@ void at_executor_worker_main(Datum arg)
680679
}
681680
}
682681

682+
if(shared->stop_worker)
683+
{
684+
elog(LOG,"at worker stopped by parent signal");
685+
}
686+
683687
delete_worker_mem_ctx();
684688
dsm_detach(seg);
685689
proc_exit(0);
686690
}
687691

688-
intprocess_one_job(schd_executor_share_t*shared,schd_executor_status_t*status)
692+
intprocess_one_job(schd_executor_share_state_t*shared,schd_executor_status_t*status)
689693
{
690694
char*error=NULL;
691695
job_t*job;
692696
intret;
693697
charbuff[512];
694-
doublebegin,elapsed;
695-
structtimevaltv;
696698

697699
*status=shared->status=SchdExecutorWork;
698-
shared->message[0]=0;
699700

700701
pgstat_report_activity(STATE_RUNNING,"initialize job");
701702
START_SPI_SNAP();
702703

703-
gettimeofday(&tv,NULL);
704-
begin= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000;
705-
706704
job=get_next_at_job_with_lock(shared->nodename,&error);
707705

708-
gettimeofday(&tv,NULL);
709-
elapsed= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000-begin;
710-
elog(LOG,"got jobs = %f",elapsed);
711-
712706
if(!job)
713707
{
714708
if(error)
715709
{
716710
shared->status=SchdExecutorIdling;
717-
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
718-
"Cannot get job: %s",error);
719711
elog(LOG,"AT EXECUTOR: ERROR: %s",error);
720712
pfree(error);
721713
ABORT_SPI_SNAP();
@@ -734,15 +726,11 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
734726
if(error)
735727
{
736728
set_at_job_done(job,error,0);
737-
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
738-
"Cannot set session auth: %s",error);
739729
pfree(error);
740730
}
741731
else
742732
{
743733
set_at_job_done(job,"Unknown set session auth error",0);
744-
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
745-
"Cannot set session auth: unknown error");
746734
}
747735
shared->status=SchdExecutorIdling;
748736
STOP_SPI_SNAP();
@@ -761,6 +749,7 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
761749
sprintf(buff,"%lld",job->timelimit*1000);
762750
#endif
763751
SetConfigOption("statement_timeout",buff,PGC_SUSET,PGC_S_OVERRIDE);
752+
enable_timeout_after(STATEMENT_TIMEOUT,StatementTimeout);
764753
}
765754

766755
if(job->sql_params_n>0)
@@ -771,6 +760,10 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
771760
{
772761
ret=execute_spi(job->dosql[0],&error);
773762
}
763+
if(job->timelimit)
764+
{
765+
disable_timeout(STATEMENT_TIMEOUT, false);
766+
}
774767
ResetAllOptions();
775768
SetConfigOption("enable_seqscan","off",PGC_USERSET,PGC_S_SESSION);
776769
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);

‎src/scheduler_executor.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ typedef struct {
4141
boolworker_exit;
4242
}schd_executor_share_t;
4343

44+
typedefstruct {
45+
chardatabase[PGPRO_SCHEDULER_DBNAME_MAX];
46+
charnodename[PGPRO_SCHEDULER_NODENAME_MAX];
47+
TimestampTzstart_at;
48+
49+
schd_executor_status_tstatus;
50+
51+
boolstop_worker;
52+
}schd_executor_share_state_t;
53+
4454
typedefstruct {
4555
intn;
4656
char**errors;
@@ -57,7 +67,7 @@ int set_session_authorization(char *username, char **error);
5767
intdo_one_job(schd_executor_share_t*shared,schd_executor_status_t*status);
5868
intread_worker_job_limit(void);
5969
voidat_executor_worker_main(Datumarg);
60-
intprocess_one_job(schd_executor_share_t*shared,schd_executor_status_t*status);
70+
intprocess_one_job(schd_executor_share_state_t*shared,schd_executor_status_t*status);
6171
Oidset_session_authorization_by_name(char*rolename,char**error);
6272

6373
externDatumget_self_id(PG_FUNCTION_ARGS);

‎src/scheduler_job.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit)
454454

455455
ret=SPI_execute_with_args(sql,n,argtypes,values,nulls, false,0);
456456

457+
457458
set_schema(oldpath, false);
458459
pfree(oldpath);
459460
if(this_error)pfree(this_error);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp