Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1bc1681

Browse files
author
Vladimir Ershov
committed
a step toward reusable workers
1 parent8e8475a commit1bc1681

File tree

4 files changed

+138
-56
lines changed

4 files changed

+138
-56
lines changed

‎src/scheduler_executor.c‎

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,9 @@ void executor_worker_main(Datum arg)
6161
{
6262
schd_executor_share_t*shared;
6363
dsm_segment*seg;
64-
job_t*job;
65-
inti;
66-
executor_error_tEE;
67-
intret;
68-
char*error=NULL;
69-
/* bool use_pg_vars = true; */
70-
/* bool success = true; */
71-
schd_executor_status_tstatus;
72-
73-
EE.n=0;
74-
EE.errors=NULL;
64+
intresult;
65+
int64jobs_done=0;
66+
int64worker_jobs_limit=1;
7567

7668
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler_executor");
7769
seg=dsm_attach(DatumGetInt32(arg));
@@ -87,13 +79,44 @@ void executor_worker_main(Datum arg)
8779
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8880
errmsg("executor corrupted dynamic shared memory segment")));
8981
}
90-
status=shared->status=SchdExecutorWork;
91-
shared->message[0]=0;
9282

9383
SetConfigOption("application_name","pgp-s executor",PGC_USERSET,PGC_S_SESSION);
9484
pgstat_report_activity(STATE_RUNNING,"initialize");
9585
init_worker_mem_ctx("ExecutorMemoryContext");
9686
BackgroundWorkerInitializeConnection(shared->database,NULL);
87+
/* TODO check latch, wait signals, die */
88+
while(1)
89+
{
90+
result=do_one_job(shared);
91+
if(result<0)
92+
{
93+
delete_worker_mem_ctx();
94+
dsm_detach(seg);
95+
proc_exit(0);
96+
}
97+
if(++jobs_done >=worker_jobs_limit)break;
98+
}
99+
100+
shared->worker_exit= true;
101+
102+
delete_worker_mem_ctx();
103+
dsm_detach(seg);
104+
proc_exit(0);
105+
}
106+
107+
intdo_one_job(schd_executor_share_t*shared)
108+
{
109+
executor_error_tEE;
110+
char*error=NULL;
111+
schd_executor_status_tstatus;
112+
inti;
113+
job_t*job;
114+
intret;
115+
116+
EE.n=0;
117+
EE.errors=NULL;
118+
status=shared->status=SchdExecutorWork;
119+
shared->message[0]=0;
97120

98121
pgstat_report_activity(STATE_RUNNING,"initialize job");
99122
job=initializeExecutorJob(shared);
@@ -103,9 +126,9 @@ void executor_worker_main(Datum arg)
103126
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
104127
"Cannot retrive job information");
105128
shared->status=SchdExecutorError;
106-
delete_worker_mem_ctx();
107-
dsm_detach(seg);
108-
proc_exit(0);
129+
shared->worker_exit= true;
130+
131+
return-1;
109132
}
110133
current_job_id=job->cron_id;
111134
pgstat_report_activity(STATE_RUNNING,"job initialized");
@@ -124,9 +147,8 @@ void executor_worker_main(Datum arg)
124147
"Cannot set session auth: unknown error");
125148
}
126149
shared->status=SchdExecutorError;
127-
delete_worker_mem_ctx();
128-
dsm_detach(seg);
129-
proc_exit(0);
150+
shared->worker_exit= true;
151+
return-2;
130152
}
131153

132154
pqsignal(SIGTERM,handle_sigterm);
@@ -198,7 +220,11 @@ void executor_worker_main(Datum arg)
198220
if(job->attempt >=job->resubmit_limit)
199221
{
200222
status=SchdExecutorError;
223+
#ifdefHAVE_INT64
201224
push_executor_error(&EE,"Cannot resubmit: limit reached (%ld)",job->resubmit_limit);
225+
#else
226+
push_executor_error(&EE,"Cannot resubmit: limit reached (%lld)",job->resubmit_limit);
227+
#endif
202228
resubmit_current_job=0;
203229
}
204230
else
@@ -215,11 +241,6 @@ void executor_worker_main(Datum arg)
215241
}
216242
if(job->next_time_statement)
217243
{
218-
/*if(use_pg_vars)
219-
{
220-
set_pg_var(success, &EE);
221-
}
222-
*/
223244
shared->next_time=get_next_excution_time(job->next_time_statement,&EE);
224245
if(shared->next_time==0)
225246
{
@@ -240,11 +261,11 @@ void executor_worker_main(Datum arg)
240261
shared->next_time=timestamp_add_seconds(0,resubmit_current_job);
241262
resubmit_current_job=0;
242263
}
264+
destroy_job(job,1);
243265

244-
delete_worker_mem_ctx();
245-
dsm_detach(seg);
246-
proc_exit(0);
266+
return1;
247267
}
268+
248269

249270
intset_session_authorization(char*username,char**error)
250271
{

‎src/scheduler_executor.h‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ typedef struct {
3333

3434
boolset_invalid;
3535
charset_invalid_reason[PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX];
36+
37+
boolworker_exit;
3638
}schd_executor_share_t;
3739

3840
typedefstruct {
@@ -48,6 +50,7 @@ int executor_onrollback(job_t *job, executor_error_t *ee);
4850
voidset_pg_var(boolresulti,executor_error_t*ee);
4951
intpush_executor_error(executor_error_t*e,char*fmt, ...)pg_attribute_printf(2,3);
5052
intset_session_authorization(char*username,char**error);
53+
intdo_one_job(schd_executor_share_t*shared);
5154

5255
externDatumget_self_id(PG_FUNCTION_ARGS);
5356
externDatumresubmit(PG_FUNCTION_ARGS);

‎src/scheduler_manager.c‎

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,9 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
680680
scheduler_manager_pool_t*p;
681681
scheduler_manager_slot_t*item;
682682
intret;
683+
intidx;
684+
schd_executor_share_t*sdata;
685+
PGPROC*worker;
683686

684687
p=job->type==CronJob ?&(ctx->cron) :&(ctx->at);
685688
if(p->free==0)
@@ -691,27 +694,52 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
691694
set_cron_job_started(job):set_at_job_started(job);
692695
if(ret)
693696
{
694-
item=worker_alloc(sizeof(scheduler_manager_slot_t));
695-
item->job=worker_alloc(sizeof(job_t));
696-
memcpy(item->job,job,sizeof(job_t));
697+
idx=p->len-p->free;/* next free slot */
697698

698-
item->started=GetCurrentTimestamp();
699-
item->wait_worker_to_die= false;
700-
item->stop_it=job->timelimit ?
699+
if(p->slots[idx]&&p->slots[idx]->is_free)
700+
{
701+
item=p->slots[idx];
702+
item->job=worker_alloc(sizeof(job_t));
703+
memcpy(item->job,job,sizeof(job_t));
704+
item->started=GetCurrentTimestamp();
705+
item->wait_worker_to_die= false;
706+
item->stop_it=job->timelimit ?
701707
timestamp_add_seconds(0,job->timelimit):0;
708+
sdata=dsm_segment_address(item->shared);
702709

703-
if(launch_executor_worker(ctx,item)==0)
704-
{
705-
pfree(item->job);
706-
pfree(item);
707-
return0;
710+
init_executor_shared_data(sdata,ctx,item->job);
711+
worker=BackendPidGetProc(item->pid);
712+
if(worker)
713+
{
714+
item->is_free= false;
715+
SetLatch(&worker->procLatch);
716+
}
717+
else
718+
{
719+
return0;
720+
}
708721
}
722+
else
723+
{
724+
/* need to launch new worker to process job */
725+
item=worker_alloc(sizeof(scheduler_manager_slot_t));
726+
item->job=worker_alloc(sizeof(job_t));
727+
memcpy(item->job,job,sizeof(job_t));
728+
729+
item->started=item->worker_started=GetCurrentTimestamp();
730+
item->wait_worker_to_die= false;
731+
item->stop_it=job->timelimit ?
732+
timestamp_add_seconds(0,job->timelimit):0;
709733

710-
/*rrr = rand() % 30;
711-
elog(LOG, " -- set timeout in %d sec", rrr);
712-
item->stop_it = timestamp_add_seconds(0, rrr); */
713-
714-
p->slots[p->len- (p->free--)]=item;
734+
if(launch_executor_worker(ctx,item)==0)
735+
{
736+
pfree(item->job);
737+
pfree(item);
738+
return0;
739+
}
740+
p->slots[idx]=item;
741+
}
742+
p->free--;
715743
job->cron_id=-1;/* job copied to slot - no need to be destroyed */
716744

717745
return1;
@@ -739,17 +767,7 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
739767
item->shared=seg;
740768
shm_data=dsm_segment_address(item->shared);
741769

742-
shm_data->status=SchdExecutorInit;
743-
memcpy(shm_data->database,ctx->database,strlen(ctx->database));
744-
memcpy(shm_data->nodename,ctx->nodename,strlen(ctx->nodename));
745-
memcpy(shm_data->user,item->job->executor,NAMEDATALEN);
746-
shm_data->cron_id=item->job->cron_id;
747-
shm_data->start_at=item->job->start_at;
748-
shm_data->type=item->job->type;
749-
shm_data->message[0]=0;
750-
shm_data->next_time=0;
751-
shm_data->set_invalid= false;
752-
shm_data->set_invalid_reason[0]=0;
770+
init_executor_shared_data(shm_data,ctx,item->job);
753771

754772
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |
755773
BGWORKER_BACKEND_DATABASE_CONNECTION;
@@ -783,6 +801,21 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
783801
returnitem->pid;
784802
}
785803

804+
voidinit_executor_shared_data(schd_executor_share_t*data,scheduler_manager_ctx_t*ctx,job_t*job)
805+
{
806+
data->status=SchdExecutorInit;
807+
memcpy(data->database,ctx->database,strlen(ctx->database));
808+
memcpy(data->nodename,ctx->nodename,strlen(ctx->nodename));
809+
memcpy(data->user,job->executor,NAMEDATALEN);
810+
data->cron_id=job->cron_id;
811+
data->start_at=job->start_at;
812+
data->type=job->type;
813+
data->message[0]=0;
814+
data->next_time=0;
815+
data->set_invalid= false;
816+
data->set_invalid_reason[0]=0;
817+
}
818+
786819
intscheduler_start_jobs(scheduler_manager_ctx_t*ctx,task_type_ttype)
787820
{
788821
intinterval=20;
@@ -955,12 +988,14 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
955988
{
956989
toremove[nremove].pos=i;
957990
toremove[nremove].reason=RmWaitWorker;
991+
toremove[nremove].vanish_item= true;
958992
nremove++;
959993
}
960994
elseif(item->stop_it&&item->stop_it<GetCurrentTimestamp())
961995
{
962996
toremove[nremove].pos=i;
963997
toremove[nremove].reason=RmTimeout;
998+
toremove[nremove].vanish_item= true;
964999
nremove++;
9651000
}
9661001
else
@@ -970,12 +1005,14 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
9701005
{
9711006
toremove[nremove].pos=i;
9721007
toremove[nremove].reason=shm_data->status==SchdExecutorDone ?RmDone:RmError;
1008+
toremove[nremove].vanish_item=shm_data->worker_exit;
9731009
nremove++;
9741010
}
9751011
elseif(shm_data->status==SchdExecutorResubmit)
9761012
{
9771013
toremove[nremove].pos=i;
9781014
toremove[nremove].reason=RmDoneResubmit;
1015+
toremove[nremove].vanish_item=shm_data->worker_exit;
9791016
nremove++;
9801017
}
9811018
}
@@ -1100,13 +1137,28 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
11001137
STOP_SPI_SNAP();
11011138

11021139
last=p->len-p->free-1;
1103-
destroy_slot_item(item);
1140+
if(toremove[i].vanish_item)
1141+
{
1142+
destroy_slot_item(item);
1143+
}
1144+
else
1145+
{
1146+
item->is_free= true;
1147+
}
11041148

11051149
if(toremove[i].pos!=last)
11061150
{
11071151
_pdebug("--- move from %d to %d",last,toremove[i].pos);
11081152
p->slots[toremove[i].pos]=p->slots[last];
1109-
p->slots[last]=NULL;
1153+
if(toremove[i].vanish_item)
1154+
{
1155+
p->slots[last]=NULL;
1156+
}
1157+
else
1158+
{
1159+
p->slots[last]=item;
1160+
}
1161+
11101162
for(j=i+1;j<nremove;j++)
11111163
{
11121164
if(toremove[j].pos==last)

‎src/scheduler_manager.h‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include"utils/memutils.h"
1010
#include"bit_array.h"
1111
#include"scheduler_job.h"
12+
#include"scheduler_executor.h"
1213

1314
#defineCEO_MIN_POS0
1415
#defineCEO_HRS_POS1
@@ -35,9 +36,13 @@ typedef enum {
3536
typedefstruct {
3637
intpos;
3738
schd_remove_reason_treason;
39+
boolvanish_item;
3840
}scheduler_rm_item_t;
3941

4042
typedefstruct {
43+
TimestampTzworker_started;
44+
boolis_free;
45+
4146
TimestampTzstarted;
4247
TimestampTzstop_it;
4348

@@ -110,5 +115,6 @@ int set_at_job_started(job_t *job);
110115
intinit_manager_pool(scheduler_manager_pool_t*p,intN);
111116
intrefresh_manager_pool(constchar*database,constchar*name,scheduler_manager_pool_t*p,intN);
112117
voiddestroy_scheduler_manager_pool(scheduler_manager_pool_t*p);
118+
voidinit_executor_shared_data(schd_executor_share_t*data,scheduler_manager_ctx_t*ctx,job_t*job);
113119

114120
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp