Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit267ed06

Browse files
author
Vladimir Ershov
committed
fix sleepy
1 parent4f3de81 commit267ed06

File tree

3 files changed

+11
-40
lines changed

3 files changed

+11
-40
lines changed

‎src/scheduler_executor.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -671,12 +671,12 @@ void at_executor_worker_main(Datum arg)
671671

672672
if(lets_sleep)
673673
{
674-
elog(LOG,"sleeping");
675674
pgstat_report_activity(STATE_IDLE,"waiting for a job");
676675
rc=WaitLatch(MyLatch,
677676
WL_LATCH_SET |WL_POSTMASTER_DEATH |WL_TIMEOUT,1000L);
678677
ResetLatch(MyLatch);
679678
if(rc&&rc&WL_POSTMASTER_DEATH)break;
679+
lets_sleep= false;
680680
}
681681
}
682682

@@ -711,15 +711,17 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
711711

712712
if(!job)
713713
{
714-
STOP_SPI_SNAP();
715714
if(error)
716715
{
717716
shared->status=SchdExecutorIdling;
718717
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
719718
"Cannot get job: %s",error);
719+
elog(LOG,"AT EXECUTOR: ERROR: %s",error);
720720
pfree(error);
721+
ABORT_SPI_SNAP();
721722
return-1;
722723
}
724+
STOP_SPI_SNAP();
723725
shared->status=SchdExecutorIdling;
724726
return0;
725727
}
@@ -784,7 +786,6 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
784786
sprintf(buff,"error in command: code: %d",ret);
785787
set_at_job_done(job,buff,resubmit_current_job);
786788
}
787-
788789
}
789790
else
790791
{

‎src/scheduler_manager.c

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -681,33 +681,19 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
681681
schd_executor_share_t*sdata;
682682
PGPROC*worker;
683683
boolstarted= false;
684-
structtimevaltv;
685-
doublebegin,elapsed;
686684

687685
p=job->type==CronJob ?&(ctx->cron) :&(ctx->at);
688686
if(p->free==0)
689687
{
690688
return-1;
691689
}
692690

693-
SetCurrentStatementStartTimestamp();
694-
695-
gettimeofday(&tv,NULL);
696-
begin= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000;
691+
START_SPI_SNAP();
697692

698-
StartTransactionCommand();
699-
PushActiveSnapshot(GetTransactionSnapshot());
700-
SPI_connect();
701693
ret=job->type==CronJob ?
702694
set_cron_job_started(job):set_at_job_started(job);
703-
SPI_finish();
704-
PopActiveSnapshot();
705-
706-
gettimeofday(&tv,NULL);
707-
elapsed= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000-begin;
708-
elog(LOG,"move job %f",elapsed);
709-
CommitTransactionCommand();
710695

696+
START_SPI_SNAP();
711697

712698
if(ret)
713699
{
@@ -878,8 +864,6 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
878864
char*ts;
879865
scheduler_manager_pool_t*p;
880866
TimestampTz*check_time;
881-
structtimevaltv;
882-
doublebegin,elapsed;
883867

884868
if(type==CronJob)
885869
{
@@ -900,16 +884,9 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
900884
return1;
901885
}
902886

903-
gettimeofday(&tv,NULL);
904-
begin= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000;
905887

906888
jobs=get_jobs_to_do(ctx->nodename,type,&njobs,&is_error,p->free);
907889

908-
gettimeofday(&tv,NULL);
909-
elapsed= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000-begin;
910-
elog(LOG,"get todo %d = %f",type,elapsed);
911-
912-
913890
nwaiting=njobs;
914891
if(is_error)
915892
{
@@ -929,19 +906,16 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
929906
N=p->free;
930907
if(N>nwaiting)N=nwaiting;
931908

932-
/* START_SPI_SNAP(); */
933909

934910
for(i=start_i;i<N+start_i;i++)
935911
{
936912
ni=type==CronJob ?
937913
how_many_instances_on_work(ctx,&(jobs[i])):100000;
938914
if(type==CronJob&&ni >=jobs[i].max_instances)
939915
{
940-
/* START_SPI_SNAP(); */
941916
set_job_error(&jobs[i],"max instances limit reached");
942917
move_job_to_log(&jobs[i], false, false);
943918
destroy_job(&jobs[i],0);
944-
/* STOP_SPI_SNAP(); */
945919
jobs[i].cron_id=-1;
946920
}
947921
else
@@ -964,21 +938,13 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
964938
elog(ERROR,"Cannot set job to free slot type=%d, id=%d",
965939
jobs[i].type,jobs[i].cron_id);
966940
}
967-
/*START_SPI_SNAP(); */
968941
move_job_to_log(&jobs[i], false, false);
969942
destroy_job(&jobs[i],0);
970943
jobs[i].cron_id=-1;
971-
/* STOP_SPI_SNAP(); */
972944
}
973-
gettimeofday(&tv,NULL);
974-
elapsed= ((double)tv.tv_sec)*1000+ ((double)tv.tv_usec)/1000-begin;
975-
elog(LOG,"get todo %d set one job = %f",type,elapsed);
976945
}
977946
}
978947

979-
/* STOP_SPI_SNAP(); */
980-
981-
982948
if(N<nwaiting)
983949
{
984950
start_i+=N;
@@ -1597,7 +1563,7 @@ int start_at_worker(scheduler_manager_ctx_t *ctx, int pos)
15971563
worker.bgw_main_arg=UInt32GetDatum(dsm_segment_handle(seg));
15981564
sprintf(worker.bgw_library_name,"pgpro_scheduler");
15991565
sprintf(worker.bgw_function_name,"at_executor_worker_main");
1600-
snprintf(worker.bgw_name,BGW_MAXLEN,"scheduler executor %s",shm_data->database);
1566+
snprintf(worker.bgw_name,BGW_MAXLEN,"schedulerat-executor %s",shm_data->database);
16011567
worker.bgw_notify_pid=MyProcPid;
16021568

16031569
if(!RegisterDynamicBackgroundWorker(&worker,&(item->handler)))

‎src/scheduler_spi_utils.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ Datum select_onedatumvalue_sql(const char *sql, bool *is_null)
246246
intret;
247247
Datumdatum=0;
248248

249+
SetCurrentStatementStartTimestamp();
249250
ret=SPI_execute(sql, true,0);
250251
if(ret==SPI_OK_SELECT)
251252
{
@@ -266,6 +267,7 @@ int select_count_with_args(const char *sql, int n, Oid *argtypes, Datum *values,
266267
Datumdatum;
267268
boolis_null;
268269

270+
SetCurrentStatementStartTimestamp();
269271
ret=SPI_execute_with_args(sql,n,argtypes,values,nulls, true,0);
270272
if(ret==SPI_OK_SELECT)
271273
{
@@ -303,6 +305,7 @@ int execute_spi_sql_with_args(const char *sql, int n, Oid *argtypes, Datum *valu
303305

304306
PG_TRY();
305307
{
308+
SetCurrentStatementStartTimestamp();
306309
ret=SPI_execute_with_args(sql,n,argtypes,values,nulls, false,0);
307310
}
308311
PG_CATCH();
@@ -389,6 +392,7 @@ int execute_spi_params_prepared(const char *sql, int nparams, char **params, cha
389392
plan=SPI_prepare(sql,nparams,paramtypes);
390393
if(plan)
391394
{
395+
SetCurrentStatementStartTimestamp();
392396
ret=SPI_execute_plan(plan,values,NULL, false,0);
393397
}
394398
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp