Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf7e77f4

Browse files
author
Vladimir Ershov
committed
fix ms windows timeout
1 parent17e9358 commitf7e77f4

File tree

7 files changed

+74
-40
lines changed

7 files changed

+74
-40
lines changed

‎Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ else
2020
include $(top_builddir)/src/Makefile.global
2121
include $(top_srcdir)/contrib/contrib-global.mk
2222
endif
23+
24+
#check: temp-install
25+
#$(prove_check)

‎pgpro_scheduler--2.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1338,7 +1338,7 @@ CREATE VIEW all_job_status AS
13381338
attempt, resubmit_limit, postponeas max_wait_interval,
13391339
max_run_timeas max_duration, submit_time,
13401340
start_time, statusas is_success, reasonas error, done_time,
1341-
'processing'::job_at_status_t status
1341+
'done'::job_at_status_t status
13421342
FROMschedule.at_jobs_done
13431343
UNION
13441344
SELECT

‎src/scheduler_executor.c

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,16 @@ void executor_worker_main(Datum arg)
101101

102102
SetConfigOption("application_name","pgp-s executor",PGC_USERSET,PGC_S_SESSION);
103103
pgstat_report_activity(STATE_RUNNING,"initialize");
104-
init_worker_mem_ctx("ExecutorMemoryContext");
105104
BackgroundWorkerInitializeConnection(shared->database,NULL);
106-
worker_jobs_limit=read_worker_job_limit();
107105

108106
pqsignal(SIGTERM,handle_sigterm);
109107
pqsignal(SIGHUP,worker_spi_sighup);
110108
BackgroundWorkerUnblockSignals();
111109

110+
init_worker_mem_ctx("ExecutorMemoryContext");
111+
switch_to_worker_context();
112+
worker_jobs_limit=read_worker_job_limit();
113+
112114
while(1)
113115
{
114116
/* we need it if idle worker recieve SIGHUP an realize that it done
@@ -619,7 +621,7 @@ void at_executor_worker_main(Datum arg)
619621
boollets_sleep= false;
620622
/* PGPROC *parent; */
621623

622-
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler_executor");
624+
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler_at_executor");
623625
seg=dsm_attach(DatumGetInt32(arg));
624626
if(seg==NULL)
625627
ereport(ERROR,
@@ -638,13 +640,15 @@ void at_executor_worker_main(Datum arg)
638640

639641
SetConfigOption("application_name","pgp-s at executor",PGC_USERSET,PGC_S_SESSION);
640642
pgstat_report_activity(STATE_RUNNING,"initialize");
641-
init_worker_mem_ctx("ExecutorMemoryContext");
642643
BackgroundWorkerInitializeConnection(shared->database,NULL);
643644

644645
pqsignal(SIGTERM,handle_sigterm);
645646
pqsignal(SIGHUP,worker_spi_sighup);
646647
BackgroundWorkerUnblockSignals();
647648

649+
init_worker_mem_ctx("ExecutorMemoryContext");
650+
switch_to_worker_context();
651+
648652
while(1)
649653
{
650654
if(shared->stop_worker)break;
@@ -698,7 +702,7 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
698702

699703
*status=shared->status=SchdExecutorWork;
700704

701-
pgstat_report_activity(STATE_RUNNING,"initialize job");
705+
pgstat_report_activity(STATE_RUNNING,"initializeatjob");
702706
START_SPI_SNAP();
703707

704708
job=get_next_at_job_with_lock(shared->nodename,&error);
@@ -718,6 +722,15 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
718722
return0;
719723
}
720724
current_job_id=job->cron_id;
725+
if(!move_at_job_process(job->cron_id))
726+
{
727+
elog(LOG,"AT EXECUTOR: error move to process");
728+
ABORT_SPI_SNAP();
729+
return-1;
730+
}
731+
STOP_SPI_SNAP();/* Commit changes */
732+
733+
START_SPI_SNAP();
721734
pgstat_report_activity(STATE_RUNNING,"job initialized");
722735

723736
ResetAllOptions();
@@ -743,13 +756,7 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
743756

744757
if(job->timelimit)
745758
{
746-
#ifdefHAVE_LONG_INT_64
747-
sprintf(buff,"%ld",job->timelimit*1000);
748-
#else
749-
sprintf(buff,"%lld",job->timelimit*1000);
750-
#endif
751-
SetConfigOption("statement_timeout",buff,PGC_SUSET,PGC_S_OVERRIDE);
752-
enable_timeout_after(STATEMENT_TIMEOUT,StatementTimeout);
759+
enable_timeout_after(STATEMENT_TIMEOUT,job->timelimit*1000);
753760
}
754761

755762
if(job->sql_params_n>0)

‎src/scheduler_job.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,16 +168,15 @@ job_t *get_next_at_job_with_lock(char *nodename, char **error)
168168
Datumvalues[1];
169169
char*oldpath;
170170

171-
constchar*get_job_sql="select id, at, array_append('{}'::text[], do_sql)::text[], params, executor, attempt, resubmit_limit, max_run_time from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED";
172-
oldpath=set_schema(NULL, true);
171+
constchar*get_job_sql="select id, at, array_append('{}'::text[], do_sql)::text[], params, executor, attempt, resubmit_limit, max_run_time from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit 1 FOR UPDATE SKIP LOCKED";
172+
oldpath=set_schema(NULL, true);
173173
*error=NULL;
174174
values[0]=CStringGetTextDatum(nodename);
175175

176-
ret=execute_spi_sql_with_args(get_job_sql,1,
176+
ret=execute_spi_sql_with_args(get_job_sql,1,
177177
argtypes,values,NULL,error);
178-
179178
set_schema(oldpath, false);
180-
pfree(oldpath);
179+
pfree(oldpath);
181180
if(ret==SPI_OK_SELECT)
182181
{
183182
got=SPI_processed;
@@ -401,6 +400,22 @@ int _at_move_job_to_log(job_t *j, bool status, bool process)
401400
returnret>0 ?1:ret;
402401
}
403402

403+
intmove_at_job_process(intjob_id)
404+
{
405+
constchar*sql="WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_process SELECT * FROM moved_rows";
406+
Datumvalues[1];
407+
Oidargtypes[1]= {INT4OID };
408+
intret;
409+
char*oldpath;
410+
411+
values[0]=Int32GetDatum(job_id);
412+
oldpath=set_schema(NULL, true);
413+
ret=SPI_execute_with_args(sql,1,argtypes,values,NULL, false,0);
414+
set_schema(oldpath, false);
415+
pfree(oldpath);
416+
returnret>0 ?1:ret;
417+
}
418+
404419
intset_at_job_done(job_t*job,char*error,int64resubmit)
405420
{
406421
char*this_error=NULL;
@@ -412,8 +427,9 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit)
412427
constchar*sql;
413428
intn=3;
414429

415-
constchar*sql_submitted="WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, NULL as start_time, $2 as status, $3 as reason FROM moved_rows";
416-
constchar*resubmit_sql="update ONLY at_jobs_submitted SET attempt = attempt + 1, at = $2 WHERE id = $1";
430+
constchar*sql_submitted="WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
431+
/* const char *resubmit_sql = "update ONLY at_jobs_submitted SET attempt = attempt + 1, at = $2 WHERE id = $1"; */
432+
constchar*resubmit_sql="WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_submitted SELECT id, node, name, comments, $2, do_sql, params, depends_on, executor, owner, last_start_available, attempt +1 , resubmit_limit, postpone, max_run_time, canceled, submit_time FROM moved_rows";
417433

418434
values[0]=Int32GetDatum(job->cron_id);
419435

‎src/scheduler_job.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ int _at_move_job_to_log(job_t *j, bool status, bool processed);
5858
intresubmit_at_job(job_t*j,TimestampTznext);
5959
job_t*get_next_at_job_with_lock(char*nodename,char**error);
6060
intset_at_job_done(job_t*job,char*error,int64resubmit);
61+
intmove_at_job_process(intjob_id);
6162

6263
#endif
6364

‎src/scheduler_manager.c

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,20 +1601,18 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
16011601
MemoryContextold;
16021602
scheduler_manager_slot_t*item;
16031603

1604-
item=worker_alloc(sizeof(scheduler_manager_slot_t));
1605-
item->job=NULL;
1606-
item->started=item->worker_started=GetCurrentTimestamp();
1607-
item->wait_worker_to_die= false;
1608-
item->stop_it=0;
1604+
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
16091605

16101606
pgstat_report_activity(STATE_RUNNING,"register scheduler at executor");
1611-
16121607
segsize= (Size)sizeof(schd_executor_share_state_t);
1613-
1614-
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
1615-
old=MemoryContextSwitchTo(SchedulerWorkerContext);
16161608
seg=dsm_create(segsize,0);
16171609

1610+
old=MemoryContextSwitchTo(SchedulerWorkerContext);
1611+
item=worker_alloc(sizeof(scheduler_manager_slot_t));
1612+
item->job=NULL;
1613+
item->started=item->worker_started=GetCurrentTimestamp();
1614+
item->wait_worker_to_die= false;
1615+
item->stop_it=0;
16181616
item->shared=seg;
16191617
shm_data=dsm_segment_address(item->shared);
16201618

@@ -1640,9 +1638,9 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
16401638
elog(LOG,"Cannot register AT executor worker for db: %s",
16411639
shm_data->database);
16421640
dsm_detach(item->shared);
1643-
MemoryContextSwitchTo(old);
16441641
pfree(item);
16451642
ctx->at.slots[pos]=NULL;
1643+
MemoryContextSwitchTo(old);
16461644
return0;
16471645
}
16481646
status=WaitForBackgroundWorkerStartup(item->handler,&(item->pid));
@@ -1651,13 +1649,13 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
16511649
elog(LOG,"Cannot start AT executor worker for db: %s, status: %d",
16521650
shm_data->database,status);
16531651
dsm_detach(item->shared);
1654-
MemoryContextSwitchTo(old);
16551652
pfree(item);
16561653
ctx->at.slots[pos]=NULL;
1654+
MemoryContextSwitchTo(old);
16571655
return0;
16581656
}
1659-
MemoryContextSwitchTo(old);
16601657
ctx->at.slots[pos]=item;
1658+
MemoryContextSwitchTo(old);
16611659

16621660
returnitem->pid;
16631661
}

‎src/scheduler_spi_utils.c

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,23 +296,27 @@ int execute_spi_sql_with_args(const char *sql, int n, Oid *argtypes, Datum *valu
296296
{
297297
intret=-100;
298298
ErrorData*edata;
299-
MemoryContextold;
300299
interrorSet=0;
301300
charother[100];
301+
ResourceOwneroldowner=CurrentResourceOwner;
302302

303303
*error=NULL;
304304

305305
SetCurrentStatementStartTimestamp();
306306
BeginInternalSubTransaction(NULL);
307+
switch_to_worker_context();
308+
307309
PG_TRY();
308310
{
309311
ret=SPI_execute_with_args(sql,n,argtypes,values,nulls, false,0);
310312
ReleaseCurrentSubTransaction();
311-
SPI_restore_connection();
313+
switch_to_worker_context();
314+
CurrentResourceOwner=oldowner;
315+
SPI_restore_connection();
312316
}
313317
PG_CATCH();
314318
{
315-
old=switch_to_worker_context();
319+
switch_to_worker_context();
316320

317321
edata=CopyErrorData();
318322
if(edata->message)
@@ -329,9 +333,10 @@ int execute_spi_sql_with_args(const char *sql, int n, Oid *argtypes, Datum *valu
329333
}
330334
errorSet=1;
331335
RollbackAndReleaseCurrentSubTransaction();
332-
SPI_restore_connection();
336+
switch_to_worker_context();
337+
CurrentResourceOwner=oldowner;
338+
SPI_restore_connection();
333339
FreeErrorData(edata);
334-
MemoryContextSwitchTo(old);
335340
FlushErrorState();
336341
}
337342
PG_END_TRY();
@@ -373,13 +378,13 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
373378
{
374379
intret=-100;
375380
ErrorData*edata;
376-
MemoryContextold;
377381
interrorSet=0;
378382
charother[100];
379383
SPIPlanPtrplan;
380384
Oid*paramtypes;
381385
Datum*values;
382386
inti;
387+
ResourceOwneroldowner=CurrentResourceOwner;
383388

384389
*error=NULL;
385390

@@ -393,6 +398,7 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
393398

394399
SetCurrentStatementStartTimestamp();
395400
BeginInternalSubTransaction(NULL);
401+
switch_to_worker_context();
396402

397403
PG_TRY();
398404
{
@@ -403,11 +409,13 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
403409
ret=SPI_execute_plan(plan,values,NULL, false,0);
404410
}
405411
ReleaseCurrentSubTransaction();
412+
switch_to_worker_context();
413+
CurrentResourceOwner=oldowner;
406414
SPI_restore_connection();
407415
}
408416
PG_CATCH();
409417
{
410-
old=switch_to_worker_context();
418+
switch_to_worker_context();
411419

412420
edata=CopyErrorData();
413421
if(edata->message)
@@ -424,9 +432,10 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
424432
}
425433
errorSet=1;
426434
FreeErrorData(edata);
427-
MemoryContextSwitchTo(old);
428435
FlushErrorState();
429436
RollbackAndReleaseCurrentSubTransaction();
437+
CurrentResourceOwner=oldowner;
438+
switch_to_worker_context();
430439
SPI_restore_connection();
431440
}
432441
PG_END_TRY();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp