Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd2b77ad

Browse files
author
Vladimir Ershov
committed
avoid with query
1 parentf7e77f4 commitd2b77ad

8 files changed

+255
-31
lines changed

‎pgpro_scheduler--2.0.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ SET search_path TO schedule;
55

66
CREATETYPEjob_status_tAS ENUM ('working','done','error');
77
CREATETYPEjob_at_status_tAS ENUM ('submitted','processing','done');
8+
CREATESEQUENCEschedule.at_jobs_submitted_id_seq;
89

910
CREATETABLEat_jobs_submitted(
10-
idSERIALPRIMARY KEY,
11+
idintPRIMARY KEY,
1112
nodetext,
1213
nametext,
1314
commentstext,
@@ -37,6 +38,8 @@ ALTER TABLE at_jobs_done ADD status boolean;
3738
ALTERTABLE at_jobs_done ADD reasontext;
3839
ALTERTABLE at_jobs_done ADD done_timetimestamp with time zone default now();
3940

41+
ALTERTABLE at_jobs_submitted ALTER idSET default nextval('schedule.at_jobs_submitted_id_seq');
42+
4043

4144
CREATETABLEcron(
4245
idSERIALPRIMARY KEY,

‎src/pgpro_scheduler.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ char_array_t *readBasesToCheck(void)
276276
SPI_finish();
277277
PopActiveSnapshot();
278278
CommitTransactionCommand();
279+
elog(ERROR,"cannot select from pg_database");
279280
}
280281
destroyCharArray(names);
281282
processed=SPI_processed;

‎src/sched_manager_poll.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ char *supervisor_state(schd_managers_poll_t *poll)
4343

4444
if(!poll->enabled)
4545
{
46-
status=palloc(sizeof(char)*9);
46+
status=worker_alloc(sizeof(char)*9);
4747
memcpy(status,"disabled",8);
4848
status[8]=0;
4949
returnstatus;
@@ -52,13 +52,13 @@ char *supervisor_state(schd_managers_poll_t *poll)
5252
len=dbnames ?strlen(dbnames):0;
5353
if(len==0)
5454
{
55-
status=palloc(sizeof(char)*26);
55+
status=worker_alloc(sizeof(char)*26);
5656
memcpy(status,"waiting databases to set",25);
5757
status[25]=0;
5858
}
5959
else
6060
{
61-
status=palloc(sizeof(char)* (len+10));
61+
status=worker_alloc(sizeof(char)* (len+10));
6262
memcpy(status,"work on: ",9);
6363
memcpy(status+9,dbnames,len);
6464
status[len+9]=0;
@@ -82,7 +82,7 @@ char *poll_dbnames(schd_managers_poll_t *poll)
8282
if(i< (poll->n-1))
8383
appendStringInfo(&string,", ");
8484
}
85-
out=palloc(sizeof(char)* (string.len+1));
85+
out=worker_alloc(sizeof(char)* (string.len+1));
8686
memcpy(out,string.data,string.len);
8787
out[string.len]=0;
8888
pfree(string.data);
@@ -310,8 +310,8 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
310310
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
311311
seg=dsm_create(segsize,0);
312312

313-
man=palloc(sizeof(schd_manager_t));
314-
man->dbname=palloc(sizeof(char*)* (strlen(name)+1));
313+
man=worker_alloc(sizeof(schd_manager_t));
314+
man->dbname=worker_alloc(sizeof(char*)* (strlen(name)+1));
315315
man->connected= false;
316316
memcpy(man->dbname,name,strlen(name)+1);
317317
man->shared=seg;
@@ -331,7 +331,7 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
331331
pos=poll->n++;
332332
poll->workers=poll->workers ?
333333
repalloc(poll->workers,sizeof(schd_manager_t*)*poll->n):
334-
palloc(sizeof(schd_manager_t*));
334+
worker_alloc(sizeof(schd_manager_t*));
335335
poll->workers[pos]=man;
336336
if(sort)_sortPollManagers(poll);
337337

‎src/scheduler_executor.c

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -696,23 +696,24 @@ void at_executor_worker_main(Datum arg)
696696
intprocess_one_job(schd_executor_share_state_t*shared,schd_executor_status_t*status)
697697
{
698698
char*error=NULL;
699+
char*set_error=NULL;
699700
job_t*job;
700-
intret;
701+
intret,set_ret;
701702
charbuff[512];
702703

703704
*status=shared->status=SchdExecutorWork;
704705

705706
pgstat_report_activity(STATE_RUNNING,"initialize at job");
706707
START_SPI_SNAP();
707708

708-
job=get_next_at_job_with_lock(shared->nodename,&error);
709-
709+
/*job = get_next_at_job_with_lock(shared->nodename, &error); */
710+
job=get_at_job_for_process(shared->nodename,&error);
710711
if(!job)
711712
{
712713
if(error)
713714
{
714715
shared->status=SchdExecutorIdling;
715-
elog(LOG,"AT EXECUTOR: ERROR: %s",error);
716+
elog(LOG,"AT EXECUTOR ERROR: get job: %s",error);
716717
pfree(error);
717718
ABORT_SPI_SNAP();
718719
return-1;
@@ -722,30 +723,35 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
722723
return0;
723724
}
724725
current_job_id=job->cron_id;
725-
if(!move_at_job_process(job->cron_id))
726+
/*if(!move_at_job_process(job->cron_id))
726727
{
727728
elog(LOG, "AT EXECUTOR: error move to process");
728729
ABORT_SPI_SNAP();
729730
return -1;
730-
}
731+
} */
731732
STOP_SPI_SNAP();/* Commit changes */
732-
733-
START_SPI_SNAP();
734733
pgstat_report_activity(STATE_RUNNING,"job initialized");
734+
START_SPI_SNAP();
735735

736736
ResetAllOptions();
737737
if(set_session_authorization_by_name(job->executor,&error)==InvalidOid)
738738
{
739739
if(error)
740740
{
741-
set_at_job_done(job,error,0);
741+
set_ret=set_at_job_done(job,error,0,&set_error);
742742
pfree(error);
743743
}
744744
else
745745
{
746-
set_at_job_done(job,"Unknown set session auth error",0);
746+
set_ret=set_at_job_done(job,"Unknown set session auth error",0,&set_error);
747747
}
748748
shared->status=SchdExecutorIdling;
749+
if(set_ret<0)
750+
{
751+
elog(LOG,"AT-EXECUTOR ERROR: move after auth: %s",set_error);
752+
ABORT_SPI_SNAP();
753+
return-1;
754+
}
749755
STOP_SPI_SNAP();
750756
return1;
751757
}
@@ -778,26 +784,40 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
778784
{
779785
if(error)
780786
{
781-
set_at_job_done(job,error,resubmit_current_job);
787+
set_ret=set_at_job_done(job,error,resubmit_current_job,&set_error);
782788
pfree(error);
783789
}
784790
else
785791
{
786792
sprintf(buff,"error in command: code: %d",ret);
787-
set_at_job_done(job,buff,resubmit_current_job);
793+
set_ret=set_at_job_done(job,buff,resubmit_current_job,&set_error);
788794
}
789795
}
790796
else
791797
{
792-
set_at_job_done(job,NULL,resubmit_current_job);
798+
set_ret=set_at_job_done(job,NULL,resubmit_current_job,&set_error);
793799
}
794-
STOP_SPI_SNAP();
795800

796801
resubmit_current_job=0;
797802
current_job_id=-1;
798803
pgstat_report_activity(STATE_RUNNING,"finish job processing");
804+
if(set_ret>0)
805+
{
806+
STOP_SPI_SNAP();
807+
return1;
808+
}
809+
if(set_error)
810+
{
811+
elog(LOG,"AT_EXECUTOR ERROR: set log: %s",set_error);
812+
pfree(set_error);
813+
}
814+
else
815+
{
816+
elog(LOG,"AT_EXECUTOR ERROR: set log: unknown error");
817+
}
818+
ABORT_SPI_SNAP();
799819

800-
return1;
820+
return-1;
801821
}
802822

803823
Oidset_session_authorization_by_name(char*rolename,char**error)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp