Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit672905b

Browse files
author
Vladimir Ershov
committed
all params checked && execute with params
1 parentb38692b commit672905b

File tree

7 files changed

+120
-18
lines changed

7 files changed

+120
-18
lines changed

‎pgpro_scheduler--2.0.sql‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ BEGIN
195195
last_avail :=NULL;
196196
END IF;
197197

198+
IF node ISNULL THEN
199+
node :='master';
200+
END IF;
201+
198202
IF run_asIS NOT NULLAND run_as<>session_user THEN
199203
executor := run_as;
200204
BEGIN
@@ -211,10 +215,10 @@ BEGIN
211215

212216
INSERT INTO at_jobs_submitted
213217
(node, at, do_sql, owner, executor, name, comments, max_run_time,
214-
postpone, last_start_available, depends_on)
218+
postpone, last_start_available, depends_on, params)
215219
VALUES
216220
(node, run_after, query,session_user, executor, name, comments,
217-
max_duration, max_wait_interval, last_avail, depends_on)
221+
max_duration, max_wait_interval, last_avail, depends_on, params)
218222
RETURNING id INTO job_id;
219223

220224
RETURN job_id;

‎src/scheduler_executor.c‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,14 @@ void executor_worker_main(Datum arg)
150150
{
151151
START_SPI_SNAP();
152152
}
153-
ret=execute_spi(job->dosql[i],&error);
153+
if(job->type==AtJob&&i==0&&job->sql_params_n>0)
154+
{
155+
ret=execute_spi_params_prepared(job->dosql[i],job->sql_params_n,job->sql_params,&error);
156+
}
157+
else
158+
{
159+
ret=execute_spi(job->dosql[i],&error);
160+
}
154161
if(ret<0)
155162
{
156163
/* success = false; */

‎src/scheduler_job.c‎

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ job_t *get_at_job(int cron_id, char *nodename, char **perror)
6262
}
6363

6464
j=init_scheduler_job(NULL,AtJob);
65+
j->cron_id=cron_id;
6566
j->node=_copy_string(nodename);
6667
j->dosql=get_textarray_from_spi(0,2,&j->dosql_n);
6768
j->executor=get_text_from_spi(0,3);
@@ -163,7 +164,7 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
163164
intret,got,i;
164165
Oidargtypes[2]= {TEXTOID,INT4OID };
165166
Datumvalues[2];
166-
constchar*get_job_sql="select id, at, last_start_available, max_run_time,max_instances, executor from ONLY at_jobs_submitted where at <= 'now' and (last_start_available is NULL OR last_start_available > 'now') AND node = $1 order by at, submit_time limit $2";
167+
constchar*get_job_sql="select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted where at <= 'now' and (last_start_available is NULL OR last_start_available > 'now') AND node = $1 order by at, submit_time limit $2";
167168

168169
*is_error=*n=0;
169170
START_SPI_SNAP();
@@ -184,9 +185,8 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
184185
jobs[i].start_at=get_timestamp_from_spi(i,2,0);
185186
jobs[i].last_start_avail=get_timestamp_from_spi(i,3,0);
186187
jobs[i].timelimit=get_interval_seconds_from_spi(i,4,0);
187-
jobs[i].max_instances=get_int_from_spi(i,5,1);
188188
jobs[i].node=_copy_string(nodename);
189-
jobs[i].executor=get_text_from_spi(i,6);
189+
jobs[i].executor=get_text_from_spi(i,5);
190190
}
191191
}
192192
}
@@ -323,19 +323,23 @@ job_t *set_job_error(job_t *j, const char *fmt, ...)
323323
returnj;
324324
}
325325

326-
intmove_job_to_log(job_t*j,boolstatus)
326+
intmove_job_to_log(job_t*j,boolstatus,boolprocess)
327327
{
328328
if(j->type==CronJob)_cron_move_job_to_log(j,status);
329-
return_at_move_job_to_log(j,status);
329+
return_at_move_job_to_log(j,status,process);
330330
}
331331

332-
int_at_move_job_to_log(job_t*j,boolstatus)
332+
int_at_move_job_to_log(job_t*j,boolstatus,boolprocess)
333333
{
334334
Datumvalues[3];
335335
charnulls[3]= {' ',' ',' ' };
336336
Oidargtypes[4]= {INT4OID,BOOLOID,TEXTOID };
337337
intret;
338-
constchar*sql="WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
338+
constchar*sql_process="WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, $2 as status, $3 as reason FROM moved_rows";
339+
constchar*sql_submitted="WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_done SELECT *, NULL as start_time, $2 as status, $3 as reason FROM moved_rows";
340+
constchar*sql;
341+
342+
sql=process ?sql_process:sql_submitted;
339343

340344
values[0]=Int32GetDatum(j->cron_id);
341345
values[1]=BoolGetDatum(status);

‎src/scheduler_job.h‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ job_t *_cron_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit);
4646
job_t*_at_get_jobs_to_do(char*nodename,int*n,int*is_error,intlimit);
4747
job_t*get_jobs_to_do(char*nodename,task_type_ttype,int*n,int*is_error,intlimit);
4848
job_t*set_job_error(job_t*j,constchar*fmt, ...)pg_attribute_printf(2,3);
49-
intmove_job_to_log(job_t*j,boolstatus);
49+
intmove_job_to_log(job_t*j,boolstatus,boolprocessed);
5050
voiddestroy_job(job_t*j,intselfdestroy);
5151
job_t*get_at_job(intcron_id,char*nodename,char**perror);
5252
job_t*get_cron_job(intcron_id,TimestampTzstart_at,char*nodename,char**perror);
5353
int_cron_move_job_to_log(job_t*j,boolstatus);
54-
int_at_move_job_to_log(job_t*j,boolstatus);
54+
int_at_move_job_to_log(job_t*j,boolstatus,boolprocessed);
5555

5656
#endif
5757

‎src/scheduler_manager.c‎

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -840,12 +840,13 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
840840

841841
for(i=start_i;i<N+start_i;i++)
842842
{
843-
ni=how_many_instances_on_work(ctx,&(jobs[i]));
844-
if(ni >=jobs[i].max_instances)
843+
ni=type==CronJob ?
844+
how_many_instances_on_work(ctx,&(jobs[i])):100000;
845+
if(type==CronJob&&ni >=jobs[i].max_instances)
845846
{
846847
START_SPI_SNAP();
847848
set_job_error(&jobs[i],"max instances limit reached");
848-
move_job_to_log(&jobs[i], false);
849+
move_job_to_log(&jobs[i], false, false);
849850
destroy_job(&jobs[i],0);
850851
STOP_SPI_SNAP();
851852
jobs[i].cron_id=-1;
@@ -867,9 +868,11 @@ int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
867868
set_job_error(&jobs[i],
868869
"Cannot set at job %d to worker",
869870
jobs[i].cron_id);
871+
elog(ERROR,"Cannot set job to free slot type=%d, id=%d",
872+
jobs[i].type,jobs[i].cron_id);
870873
}
871874
START_SPI_SNAP();
872-
move_job_to_log(&jobs[i], false);
875+
move_job_to_log(&jobs[i], false, false);
873876
destroy_job(&jobs[i],0);
874877
jobs[i].cron_id=-1;
875878
STOP_SPI_SNAP();
@@ -1063,7 +1066,7 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10631066
}
10641067
}
10651068
}
1066-
move_job_to_log(item->job,job_status);
1069+
move_job_to_log(item->job,job_status, true);
10671070
STOP_SPI_SNAP();
10681071

10691072
last=p->len-p->free-1;
@@ -1179,7 +1182,7 @@ int scheduler_vanish_expired_jobs(scheduler_manager_ctx_t *ctx, task_type_t type
11791182
set_job_error(&expired[i],"job start time %s expired",ts);
11801183
}
11811184

1182-
move_ret=move_job_to_log(&expired[i],0);
1185+
move_ret=move_job_to_log(&expired[i],0, false);
11831186
if(move_ret<0)
11841187
{
11851188
elog(LOG,"Scheduler manager %s: cannot move %s job %d@%s%s to log",

‎src/scheduler_spi_utils.c‎

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,4 +351,87 @@ int execute_spi(const char *sql, char **error)
351351
returnexecute_spi_sql_with_args(sql,0,NULL,NULL,NULL,error);
352352
}
353353

354+
intexecute_spi_params_prepared(constchar*sql,intnparams,char**params,char**error)
355+
{
356+
intret=-100;
357+
ErrorData*edata;
358+
MemoryContextold;
359+
interrorSet=0;
360+
charother[100];
361+
SPIPlanPtrplan;
362+
Oid*paramtypes;
363+
Datum*values;
364+
inti;
365+
366+
*error=NULL;
367+
368+
paramtypes=worker_alloc(sizeof(Oid)*nparams);
369+
values=worker_alloc(sizeof(Datum)*nparams);
370+
for(i=0;i<nparams;i++)
371+
{
372+
paramtypes[i]=TEXTOID;
373+
values[i]=CStringGetTextDatum(params[i]);
374+
}
375+
376+
PG_TRY();
377+
{
378+
plan=SPI_prepare(sql,nparams,paramtypes);
379+
if(plan)
380+
{
381+
ret=SPI_execute_plan(plan,values,NULL, false,0);
382+
}
383+
}
384+
PG_CATCH();
385+
{
386+
old=switch_to_worker_context();
387+
388+
edata=CopyErrorData();
389+
if(edata->message)
390+
{
391+
*error=_copy_string(edata->message);
392+
}
393+
elseif(edata->detail)
394+
{
395+
*error=_copy_string(edata->detail);
396+
}
397+
else
398+
{
399+
*error=_copy_string("unknown error");
400+
}
401+
errorSet=1;
402+
FreeErrorData(edata);
403+
MemoryContextSwitchTo(old);
404+
FlushErrorState();
405+
}
406+
PG_END_TRY();
407+
408+
pfree(values);
409+
pfree(paramtypes);
410+
411+
if(!errorSet&&ret<0)
412+
{
413+
if(ret==SPI_ERROR_CONNECT)
414+
{
415+
*error=_copy_string("Connection error");
416+
}
417+
elseif(ret==SPI_ERROR_COPY)
418+
{
419+
*error=_copy_string("COPY error");
420+
}
421+
elseif(ret==SPI_ERROR_OPUNKNOWN)
422+
{
423+
*error=_copy_string("SPI_ERROR_OPUNKNOWN");
424+
}
425+
elseif(ret==SPI_ERROR_UNCONNECTED)
426+
{
427+
*error=_copy_string("Unconnected call");
428+
}
429+
else
430+
{
431+
sprintf(other,"error number: %d",ret);
432+
*error=_copy_string(other);
433+
}
434+
}
354435

436+
returnret;
437+
}

‎src/scheduler_spi_utils.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ char *get_text_from_spi(int row_n, int pos);
2929
Oidget_oid_from_spi(introw_n,intpos,Oiddef);
3030
intexecute_spi_sql_with_args(constchar*sql,intn,Oid*argtypes,Datum*values,char*nulls,char**error);
3131
intexecute_spi(constchar*sql,char**error);
32+
intexecute_spi_params_prepared(constchar*sql,intnparams,char**params,char**error);
3233

3334
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp