Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit16b5a14

Browse files
author
Vladimir Ershov
committed
do not die
1 parent1bc1681 commit16b5a14

File tree

6 files changed

+296
-126
lines changed

6 files changed

+296
-126
lines changed

‎pgpro_scheduler--2.0.sql

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,28 @@ CREATE TABLE at_jobs_submitted(
2828
CREATEINDEXON at_jobs_submitted(at,submit_time);
2929
CREATEINDEXON at_jobs_submitted (last_start_available, node);
3030

31-
CREATETABLEat_jobs_process(
32-
start_timetimestamp with time zone default now()
33-
) INHERITS (at_jobs_submitted);
34-
35-
ALTERTABLE at_jobs_process ADDprimary key (id);
36-
CREATEINDEXat_jobs_process_node_at_idxon at_jobs_process (node, at);
37-
38-
CREATETABLEat_jobs_done(
39-
statusboolean,
40-
reasontext,
41-
done_timetimestamp with time zone default now()
42-
) INHERITS (at_jobs_process);
43-
44-
ALTERTABLE at_jobs_done ADDprimary key (id);
45-
CREATEINDEXat_jobs_done_node_at_idxon at_jobs_done (node, at);
31+
-- CREATE TABLE at_jobs_process(
32+
-- start_time timestamp with time zone default now()
33+
-- ) INHERITS (at_jobs_submitted);
34+
35+
CREATETABLEat_jobs_process (like at_jobs_submitted including all);
36+
ALTERTABLE at_jobs_process ADD start_timetimestamp with time zone default now();
37+
38+
-- ALTER TABLE at_jobs_process ADD primary key (id);
39+
CREATEINDEXat_jobs_process_node_at_idxon at_jobs_process (node, at);
40+
41+
-- CREATE TABLE at_jobs_done(
42+
-- status boolean,
43+
-- reason text,
44+
-- done_time timestamp with time zone default now()
45+
-- ) INHERITS (at_jobs_process);
46+
CREATETABLEat_jobs_done (like at_jobs_process including all);
47+
ALTERTABLE at_jobs_done ADD statusboolean;
48+
ALTERTABLE at_jobs_done ADD reasontext;
49+
ALTERTABLE at_jobs_done ADD done_timetimestamp with time zone default now();
50+
51+
--ALTER TABLE at_jobs_done ADD primary key (id);
52+
-- CREATE INDEX at_jobs_done_node_at_idx on at_jobs_done (node, at);
4653

4754
CREATETABLEcron(
4855
idSERIALPRIMARY KEY,

‎src/pgpro_scheduler.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ char *scheduler_nodename = NULL;
4747
char*scheduler_transaction_state=NULL;
4848
intscheduler_max_workers=2;
4949
intscheduler_at_max_workers=2;
50+
intscheduler_worker_job_limit=1;
5051
boolscheduler_service_enabled= false;
5152
char*scheduler_schema=NULL;
5253
/* Custom GUC done */
@@ -497,6 +498,20 @@ void _PG_init(void)
497498
NULL,
498499
NULL
499500
);
501+
DefineCustomIntVariable(
502+
"schedule.worker_job_limit",
503+
"How much job can worker serve before shutdown",
504+
NULL,
505+
&scheduler_worker_job_limit,
506+
1,
507+
1,
508+
20000,
509+
PGC_SUSET,
510+
0,
511+
NULL,
512+
NULL,
513+
NULL
514+
);
500515
pg_scheduler_startup();
501516
}
502517

‎src/scheduler_executor.c

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ handle_sigterm(SIGNAL_ARGS)
5656
errno=save_errno;
5757
}
5858

59+
intread_worker_job_limit(void)
60+
{
61+
constchar*opt;
62+
intvar;
63+
64+
opt=GetConfigOption("schedule.worker_job_limit", false, false);
65+
if(opt==NULL)return1;
66+
var=atoi(opt);
67+
returnvar;
68+
}
5969

6070
voidexecutor_worker_main(Datumarg)
6171
{
@@ -64,6 +74,9 @@ void executor_worker_main(Datum arg)
6474
intresult;
6575
int64jobs_done=0;
6676
int64worker_jobs_limit=1;
77+
intrc=0;
78+
schd_executor_status_tstatus;
79+
PGPROC*parent;
6780

6881
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler_executor");
6982
seg=dsm_attach(DatumGetInt32(arg));
@@ -72,6 +85,7 @@ void executor_worker_main(Datum arg)
7285
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7386
errmsg("executor unable to map dynamic shared memory segment")));
7487
shared=dsm_segment_address(seg);
88+
parent=BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
7589

7690
if(shared->status!=SchdExecutorInit)
7791
{
@@ -84,38 +98,77 @@ void executor_worker_main(Datum arg)
8498
pgstat_report_activity(STATE_RUNNING,"initialize");
8599
init_worker_mem_ctx("ExecutorMemoryContext");
86100
BackgroundWorkerInitializeConnection(shared->database,NULL);
87-
/* TODO check latch, wait signals, die */
101+
worker_jobs_limit=read_worker_job_limit();
102+
103+
pqsignal(SIGTERM,handle_sigterm);
104+
pqsignal(SIGHUP,worker_spi_sighup);
105+
BackgroundWorkerUnblockSignals();
106+
88107
while(1)
89108
{
90-
result=do_one_job(shared);
91-
if(result<0)
109+
/* we need it if idle worker recieve SIGHUP an realize that it done
110+
too mach */
111+
status=SchdExecutorLimitReached;
112+
113+
if(got_sighup)
114+
{
115+
got_sighup= false;
116+
ProcessConfigFile(PGC_SIGHUP);
117+
worker_jobs_limit=read_worker_job_limit();
118+
}
119+
result=do_one_job(shared,&status);
120+
if(result>0)
121+
{
122+
if(++jobs_done >=worker_jobs_limit)
123+
{
124+
shared->worker_exit= true;
125+
shared->status=status;
126+
break;
127+
}
128+
else
129+
{
130+
shared->status=status;
131+
}
132+
SetLatch(&parent->procLatch);
133+
}
134+
elseif(result<0)
92135
{
93136
delete_worker_mem_ctx();
94137
dsm_detach(seg);
95138
proc_exit(0);
96139
}
97-
if(++jobs_done >=worker_jobs_limit)break;
98-
}
99140

100-
shared->worker_exit= true;
141+
pgstat_report_activity(STATE_IDLE,"waiting for a job");
142+
rc=WaitLatch(MyLatch,WL_LATCH_SET |WL_POSTMASTER_DEATH,0L);
143+
ResetLatch(MyLatch);
144+
if(rc&&rc&WL_POSTMASTER_DEATH)break;
145+
}
101146

102147
delete_worker_mem_ctx();
103148
dsm_detach(seg);
104149
proc_exit(0);
105150
}
106151

107-
intdo_one_job(schd_executor_share_t*shared)
152+
intdo_one_job(schd_executor_share_t*shared,schd_executor_status_t*status)
108153
{
109154
executor_error_tEE;
110155
char*error=NULL;
111-
schd_executor_status_tstatus;
112156
inti;
113157
job_t*job;
114158
intret;
115159

116160
EE.n=0;
117161
EE.errors=NULL;
118-
status=shared->status=SchdExecutorWork;
162+
if(shared->new_job)
163+
{
164+
shared->new_job= false;
165+
}
166+
else
167+
{
168+
return0;
169+
}
170+
171+
*status=shared->status=SchdExecutorWork;
119172
shared->message[0]=0;
120173

121174
pgstat_report_activity(STATE_RUNNING,"initialize job");
@@ -125,8 +178,8 @@ int do_one_job(schd_executor_share_t *shared)
125178
if(shared->message[0]==0)
126179
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
127180
"Cannot retrive job information");
128-
shared->status=SchdExecutorError;
129181
shared->worker_exit= true;
182+
*status=shared->status=SchdExecutorError;
130183

131184
return-1;
132185
}
@@ -146,14 +199,11 @@ int do_one_job(schd_executor_share_t *shared)
146199
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
147200
"Cannot set session auth: unknown error");
148201
}
202+
*status=shared->worker_exit= true;
149203
shared->status=SchdExecutorError;
150-
shared->worker_exit= true;
151204
return-2;
152205
}
153206

154-
pqsignal(SIGTERM,handle_sigterm);
155-
BackgroundWorkerUnblockSignals();
156-
157207
pgstat_report_activity(STATE_RUNNING,"process job");
158208
CHECK_FOR_INTERRUPTS();
159209
/* rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 0);
@@ -183,7 +233,7 @@ int do_one_job(schd_executor_share_t *shared)
183233
if(ret<0)
184234
{
185235
/* success = false; */
186-
status=SchdExecutorError;
236+
*status=SchdExecutorError;
187237
if(error)
188238
{
189239
push_executor_error(&EE,"error in command #%d: %s",
@@ -209,7 +259,7 @@ int do_one_job(schd_executor_share_t *shared)
209259
}
210260
}
211261
}
212-
if(status!=SchdExecutorError)
262+
if(*status!=SchdExecutorError)
213263
{
214264
if(job->same_transaction)
215265
{
@@ -219,8 +269,8 @@ int do_one_job(schd_executor_share_t *shared)
219269
{
220270
if(job->attempt >=job->resubmit_limit)
221271
{
222-
status=SchdExecutorError;
223-
#ifdefHAVE_INT64
272+
*status=SchdExecutorError;
273+
#ifdefHAVE_LONG_INT_64
224274
push_executor_error(&EE,"Cannot resubmit: limit reached (%ld)",job->resubmit_limit);
225275
#else
226276
push_executor_error(&EE,"Cannot resubmit: limit reached (%lld)",job->resubmit_limit);
@@ -229,12 +279,12 @@ int do_one_job(schd_executor_share_t *shared)
229279
}
230280
else
231281
{
232-
status=SchdExecutorResubmit;
282+
*status=SchdExecutorResubmit;
233283
}
234284
}
235285
else
236286
{
237-
status=SchdExecutorDone;
287+
*status=SchdExecutorDone;
238288
}
239289

240290
SetConfigOption("schedule.transaction_state","success",PGC_INTERNAL,PGC_S_SESSION);
@@ -255,8 +305,7 @@ int do_one_job(schd_executor_share_t *shared)
255305
{
256306
set_shared_message(shared,&EE);
257307
}
258-
shared->status=status;
259-
if(status==SchdExecutorResubmit)
308+
if(*status==SchdExecutorResubmit)
260309
{
261310
shared->next_time=timestamp_add_seconds(0,resubmit_current_job);
262311
resubmit_current_job=0;

‎src/scheduler_executor.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ typedef enum {
1212
SchdExecutorWork,
1313
SchdExecutorDone,
1414
SchdExecutorResubmit,
15-
SchdExecutorError
15+
SchdExecutorError,
16+
SchdExecutorLimitReached
1617
}schd_executor_status_t;
1718

1819
typedefstruct {
20+
boolnew_job;
21+
1922
chardatabase[PGPRO_SCHEDULER_DBNAME_MAX];
2023
charnodename[PGPRO_SCHEDULER_NODENAME_MAX];
2124
charuser[NAMEDATALEN];
@@ -50,7 +53,8 @@ int executor_onrollback(job_t *job, executor_error_t *ee);
5053
voidset_pg_var(boolresulti,executor_error_t*ee);
5154
intpush_executor_error(executor_error_t*e,char*fmt, ...)pg_attribute_printf(2,3);
5255
intset_session_authorization(char*username,char**error);
53-
intdo_one_job(schd_executor_share_t*shared);
56+
intdo_one_job(schd_executor_share_t*shared,schd_executor_status_t*status);
57+
intread_worker_job_limit(void);
5458

5559
externDatumget_self_id(PG_FUNCTION_ARGS);
5660
externDatumresubmit(PG_FUNCTION_ARGS);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp