Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb38692b

Browse files
author
Vladimir Ershov
committed
up && running
1 parent5eae385 commitb38692b

9 files changed

+442
-256
lines changed

‎pgpro_scheduler--2.0.sql

Lines changed: 301 additions & 216 deletions
Large diffs are not rendered by default.

‎src/pgpro_scheduler.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ int get_integer_from_string(char *s, int start, int len)
113113
char*make_date_from_timestamp(TimestampTzts,boolhires)
114114
{
115115
structpg_tmdt;
116-
char*str=worker_alloc(sizeof(char)*19);
116+
char*str=worker_alloc(sizeof(char)*20);
117117
inttz;
118118
fsec_tfsec;
119119
constchar*tzn;

‎src/scheduler_executor.c

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
externvolatilesig_atomic_tgot_sighup;
3636
externvolatilesig_atomic_tgot_sigterm;
3737

38+
staticint64current_job_id=-1;
39+
staticintresubmit_current_job=0;
40+
3841
staticvoidhandle_sigterm(SIGNAL_ARGS);
3942

4043
staticvoid
@@ -85,6 +88,8 @@ void executor_worker_main(Datum arg)
8588
errmsg("executor corrupted dynamic shared memory segment")));
8689
}
8790
status=shared->status=SchdExecutorWork;
91+
shared->message[0]=0;
92+
8893
SetConfigOption("application_name","pgp-s executor",PGC_USERSET,PGC_S_SESSION);
8994
pgstat_report_activity(STATE_RUNNING,"initialize");
9095
init_worker_mem_ctx("ExecutorMemoryContext");
@@ -94,13 +99,16 @@ void executor_worker_main(Datum arg)
9499
job=initializeExecutorJob(shared);
95100
if(!job)
96101
{
97-
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
102+
if(shared->message[0]==0)
103+
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
98104
"Cannot retrive job information");
99105
shared->status=SchdExecutorError;
100106
delete_worker_mem_ctx();
101107
dsm_detach(seg);
102108
proc_exit(0);
103109
}
110+
current_job_id=job->cron_id;
111+
pgstat_report_activity(STATE_RUNNING,"job initialized");
104112

105113
if(set_session_authorization(job->executor,&error)<0)
106114
{
@@ -195,6 +203,7 @@ void executor_worker_main(Datum arg)
195203
sprintf(shared->set_invalid_reason,"unable to execute next time statement");
196204
}
197205
}
206+
current_job_id=-1;
198207
pgstat_report_activity(STATE_RUNNING,"finish job processing");
199208

200209
if(EE.n>0)
@@ -391,11 +400,19 @@ job_t *initializeExecutorJob(schd_executor_share_t *data)
391400
{
392401
job_t*J;
393402
char*error=NULL;
403+
constchar*schema;
404+
constchar*old_path;
405+
406+
old_path=GetConfigOption("search_path", false, true);
407+
schema=GetConfigOption("schedule.schema", false, true);
408+
SetConfigOption("search_path",schema,PGC_USERSET,PGC_S_SESSION);
394409

395410
J=data->type==CronJob ?
396411
get_cron_job(data->cron_id,data->start_at,data->nodename,&error):
397412
get_at_job(data->cron_id,data->nodename,&error);
398413

414+
SetConfigOption("search_path",old_path,PGC_USERSET,PGC_S_SESSION);
415+
399416
if(error)
400417
{
401418
snprintf(data->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
@@ -441,3 +458,25 @@ int push_executor_error(executor_error_t *e, char *fmt, ...)
441458

442459
returne->n;
443460
}
461+
462+
PG_FUNCTION_INFO_V1(get_self_id);
463+
Datum
464+
get_self_id(PG_FUNCTION_ARGS)
465+
{
466+
if(current_job_id==-1)
467+
{
468+
elog(ERROR,"There is no active job in progress");
469+
}
470+
PG_RETURN_INT64(current_job_id);
471+
}
472+
473+
PG_FUNCTION_INFO_V1(resubmit);
474+
Datum
475+
resubmit(PG_FUNCTION_ARGS)
476+
{
477+
if(current_job_id==-1)
478+
{
479+
elog(ERROR,"There is no active job in progress");
480+
}
481+
PG_RETURN_BOOL(true);
482+
}

‎src/scheduler_executor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ void set_pg_var(bool resulti, executor_error_t *ee);
4848
intpush_executor_error(executor_error_t*e,char*fmt, ...)pg_attribute_printf(2,3);
4949
intset_session_authorization(char*username,char**error);
5050

51+
externDatumget_self_id(PG_FUNCTION_ARGS);
52+
externDatumresubmit(PG_FUNCTION_ARGS);
53+
5154

5255
#endif
5356

‎src/scheduler_job.c

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ job_t *init_scheduler_job(job_t *j, unsigned char type)
2525
job_t*get_at_job(intcron_id,char*nodename,char**perror)
2626
{
2727
job_t*j;
28-
constchar*sql="select last_start_available,same_transaction, do_sql, executor, postpone, max_run_time as time_limit,max_instances, onrollback_statement, start_at fromat_process where node = $1 and id = $2";
28+
constchar*sql="select last_start_available,array_append('{}'::text[], do_sql)::text[], executor, postpone, max_run_time as time_limit,at, params, depends_on fromONLY at_jobs_process where node = $1 and id = $2";
2929
Oidargtypes[2]= {TEXTOID,INT4OID};
3030
Datumargs[2];
3131
intret;
@@ -60,15 +60,16 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror)
6060
*perror=_copy_string(buffer);
6161
returnNULL;
6262
}
63-
STOP_SPI_SNAP();
6463

6564
j=init_scheduler_job(NULL,AtJob);
6665
j->node=_copy_string(nodename);
67-
j->same_transaction=get_boolean_from_spi(0,2, false);
68-
j->dosql=get_textarray_from_spi(0,3,&j->dosql_n);
69-
j->executor=get_text_from_spi(0,4);
70-
j->onrollback=get_text_from_spi(0,8);
71-
j->start_at=get_timestamp_from_spi(0,9,0);
66+
j->dosql=get_textarray_from_spi(0,2,&j->dosql_n);
67+
j->executor=get_text_from_spi(0,3);
68+
j->start_at=get_timestamp_from_spi(0,6,0);
69+
j->sql_params=get_textarray_from_spi(0,7,&j->sql_params_n);
70+
j->depends_on=get_int64array_from_spi(0,8,&j->depends_on_n);
71+
72+
STOP_SPI_SNAP();
7273

7374
*perror=NULL;
7475
returnj;
@@ -125,7 +126,6 @@ job_t *get_cron_job(int cron_id, TimestampTz start_at, char *nodename, char **pe
125126
pfree(ts);
126127
returnNULL;
127128
}
128-
STOP_SPI_SNAP();
129129

130130
j=init_scheduler_job(NULL,CronJob);
131131
j->start_at=start_at;
@@ -135,6 +135,7 @@ job_t *get_cron_job(int cron_id, TimestampTz start_at, char *nodename, char **pe
135135
j->executor=get_text_from_spi(0,4);
136136
j->onrollback=get_text_from_spi(0,8);
137137
j->next_time_statement=get_text_from_spi(0,9);
138+
STOP_SPI_SNAP();
138139

139140
*perror=NULL;
140141
returnj;
@@ -167,7 +168,7 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
167168
*is_error=*n=0;
168169
START_SPI_SNAP();
169170
values[0]=CStringGetTextDatum(nodename);
170-
values[1]=Int32GetDatum(limit);
171+
values[1]=Int32GetDatum(limit+1);
171172
ret=SPI_execute_with_args(get_job_sql,2,argtypes,values,NULL, true,0);
172173
if(ret==SPI_OK_SELECT)
173174
{
@@ -208,7 +209,7 @@ job_t *_cron_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
208209
*is_error=*n=0;
209210
START_SPI_SNAP();
210211
values[0]=CStringGetTextDatum(nodename);
211-
values[1]=Int32GetDatum(limit);
212+
values[1]=Int32GetDatum(limit+1);
212213
ret=SPI_execute_with_args(get_job_sql,2,argtypes,values,NULL, true,0);
213214
if(ret==SPI_OK_SELECT)
214215
{
@@ -409,6 +410,15 @@ void destroy_job(job_t *j, int selfdestroy)
409410
}
410411
pfree(j->dosql);
411412
}
413+
if(j->sql_params_n&&j->sql_params)
414+
{
415+
for(i=0;i<j->sql_params_n;i++)
416+
{
417+
if(j->sql_params[i])pfree(j->sql_params[i]);
418+
}
419+
pfree(j->sql_params);
420+
}
421+
if(j->depends_on_n&&j->depends_on)pfree(j->depends_on);
412422

413423
if(selfdestroy)pfree(j);
414424
}

‎src/scheduler_job.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ typedef struct {
2424
boolsame_transaction;
2525
intdosql_n;
2626
char**dosql;
27+
intsql_params_n;
28+
char**sql_params;
29+
intdepends_on_n;
30+
int64*depends_on;
2731
TimestampTzpostpone;
2832
char*executor;
2933
char*owner;

‎src/scheduler_manager.c

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -810,12 +810,11 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
810810
interval=2;
811811
}
812812

813-
814-
if(*check_time>GetCurrentTimestamp())return-1;
813+
if(*check_time>GetCurrentTimestamp())return0;
815814
if(p->free==0)
816815
{
817-
*check_time=timestamp_add_seconds(0,1);
818-
return-2;
816+
if(type==CronJob)*check_time=timestamp_add_seconds(0,1);
817+
return1;
819818
}
820819

821820
jobs=get_jobs_to_do(ctx->nodename,type,&njobs,&is_error,p->free);
@@ -825,7 +824,7 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
825824
{
826825
*check_time=timestamp_add_seconds(0,interval);
827826
elog(LOG,"Error while retrieving jobs");
828-
return-3;
827+
return0;
829828
}
830829
if(nwaiting==0)
831830
{
@@ -897,7 +896,7 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
897896

898897
if(nwaiting>0)
899898
{
900-
interval=1;
899+
interval=type==CronJob ?1:0;
901900
}
902901
else
903902
{
@@ -917,7 +916,7 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
917916
}
918917

919918
*check_time=timestamp_add_seconds(0,interval);
920-
return1;
919+
returnnwaiting;
921920
}
922921

923922
voiddestroy_slot_item(scheduler_manager_slot_t*item)
@@ -1396,6 +1395,8 @@ void manager_worker_main(Datum arg)
13961395
schd_manager_share_t*shared;
13971396
dsm_segment*seg;
13981397
scheduler_manager_ctx_t*ctx;
1398+
intwait=0;
1399+
intterminate_main_loop=0;
13991400

14001401
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
14011402
seg=dsm_attach(DatumGetInt32(arg));
@@ -1469,35 +1470,31 @@ void manager_worker_main(Datum arg)
14691470
}
14701471
if(!got_sighup&& !got_sigterm)
14711472
{
1472-
if(rc&WL_LATCH_SET)
1473+
terminate_main_loop=0;
1474+
while(1)
14731475
{
1474-
_pdebug("got latch from some bgworker");
1475-
if(check_parent_stop_signal(ctx))break;
1476-
scheduler_start_jobs(ctx,AtJob);
1477-
scheduler_check_slots(ctx,&(ctx->cron));
1478-
scheduler_check_slots(ctx,&(ctx->at));
1479-
set_slots_stat_report(ctx);
1480-
_pdebug("quit got latch");
1481-
}
1482-
elseif(rc&WL_TIMEOUT)
1483-
{
1484-
scheduler_make_atcron_record(ctx);
1485-
/* if there are any expired jobs to get rid of */
1486-
scheduler_vanish_expired_jobs(ctx,AtJob);
1487-
scheduler_vanish_expired_jobs(ctx,CronJob);
1488-
/* start jobs */
1489-
scheduler_start_jobs(ctx,AtJob);
1490-
scheduler_start_jobs(ctx,CronJob);
1491-
/* check slots, first "at" 'cause them faster */
1476+
wait=0;
1477+
if(check_parent_stop_signal(ctx))
1478+
{
1479+
terminate_main_loop=1;
1480+
break;
1481+
}
1482+
wait+=scheduler_start_jobs(ctx,AtJob);
1483+
wait+=scheduler_start_jobs(ctx,CronJob);
14921484
scheduler_check_slots(ctx,&(ctx->at));
14931485
scheduler_check_slots(ctx,&(ctx->cron));
1494-
/* set statistics of working slots */
14951486
set_slots_stat_report(ctx);
1487+
if(wait==0)break;
14961488
}
1489+
if(terminate_main_loop)break;
1490+
scheduler_make_atcron_record(ctx);
1491+
/* if there are any expired jobs to get rid of */
1492+
scheduler_vanish_expired_jobs(ctx,AtJob);
1493+
scheduler_vanish_expired_jobs(ctx,CronJob);
14971494
}
14981495
}
14991496
rc=WaitLatch(MyLatch,
1500-
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,1000L);
1497+
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,500L);
15011498
ResetLatch(MyLatch);
15021499
}
15031500
scheduler_manager_stop(ctx);

‎src/scheduler_spi_utils.c

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,53 @@ bool get_boolean_from_spi(int row_n, int pos, bool def)
8080
returnDatumGetBool(datum);
8181
}
8282

83+
int64*get_int64array_from_spi(introw_n,intpos,int*N)
84+
{
85+
Datumdatum;
86+
boolis_null;
87+
ArrayType*input;
88+
Datum*datums;
89+
booli_typbyval;
90+
chari_typalign;
91+
int16i_typlen;
92+
intlen,i,arr_len;
93+
bool*nulls;
94+
int64*result;
95+
96+
*N=0;
97+
98+
datum=SPI_getbinval(SPI_tuptable->vals[row_n],SPI_tuptable->tupdesc,
99+
pos,&is_null);
100+
if(is_null)returnNULL;
101+
102+
input=DatumGetArrayTypeP(datum);
103+
if(ARR_ELEMTYPE(input)!=INT8OID)
104+
{
105+
returnNULL;
106+
}
107+
get_typlenbyvalalign(INT8OID,&i_typlen,&i_typbyval,&i_typalign);
108+
deconstruct_array(input,INT8OID,i_typlen,i_typbyval,i_typalign,&datums,&nulls,&len);
109+
110+
if(len==0)returnNULL;
111+
arr_len=len;
112+
113+
for(i=0;i<len;i++)
114+
{
115+
if(nulls[i])arr_len--;
116+
}
117+
result=worker_alloc(sizeof(int64)*arr_len);
118+
for(i=0;i<len;i++)
119+
{
120+
if(!nulls[i])
121+
{
122+
result[*N]=Int64GetDatum(datums[i]);
123+
(*N)++;
124+
}
125+
}
126+
127+
returnresult;
128+
}
129+
83130
char**get_textarray_from_spi(introw_n,intpos,int*N)
84131
{
85132
Datumdatum;

‎src/scheduler_spi_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Datum select_onedatumvalue_sql(const char *sql, bool *is_null);
2323
intselect_count_with_args(constchar*sql,intn,Oid*argtypes,Datum*values,char*nulls);
2424
longintget_interval_seconds_from_spi(introw_n,intpos,longdef);
2525
char**get_textarray_from_spi(introw_n,intpos,int*N);
26+
int64*get_int64array_from_spi(introw_n,intpos,int*N);
2627
boolget_boolean_from_spi(introw_n,intpos,booldef);
2728
char*get_text_from_spi(introw_n,intpos);
2829
Oidget_oid_from_spi(introw_n,intpos,Oiddef);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp