Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4c27c2d

Browse files
author
Vladimir Ershov
committed
Merge commit 'ba22bba356045511f495d21691d19e44a9bc7d0e' into PGPROEE9_6_scheduler
Conflicts:contrib/pgpro_scheduler/src/pgpro_scheduler.ccontrib/pgpro_scheduler/src/scheduler_job.ccontrib/pgpro_scheduler/src/scheduler_manager.ccontrib/pgpro_scheduler/src/scheduler_spi_utils.c
2 parents30dc23f +ba22bba commit4c27c2d

11 files changed

+548
-341
lines changed

‎contrib/pgpro_scheduler/src/memutils.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44

55
MemoryContextSchedulerWorkerContext=NULL;
66

7+
MemoryContextinit_mem_ctx(constchar*name)
8+
{
9+
returnAllocSetContextCreate(TopMemoryContext,name,
10+
ALLOCSET_DEFAULT_MINSIZE,
11+
ALLOCSET_DEFAULT_INITSIZE,
12+
ALLOCSET_DEFAULT_MAXSIZE);
13+
}
14+
715
MemoryContextinit_worker_mem_ctx(constchar*name)
816
{
917
AssertState(SchedulerWorkerContext==NULL);
@@ -31,4 +39,5 @@ void delete_worker_mem_ctx(void)
3139
{
3240
MemoryContextSwitchTo(TopMemoryContext);
3341
MemoryContextDelete(SchedulerWorkerContext);
42+
SchedulerWorkerContext=NULL;
3443
}

‎contrib/pgpro_scheduler/src/memutils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
externMemoryContextSchedulerWorkerContext;
88

99
MemoryContextinit_worker_mem_ctx(constchar*name);
10+
MemoryContextinit_mem_ctx(constchar*name);
1011
MemoryContextswitch_to_worker_context(void);
1112
void*worker_alloc(Sizesize);
1213
voiddelete_worker_mem_ctx(void);

‎contrib/pgpro_scheduler/src/pgpro_scheduler.c

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ char *make_date_from_timestamp(TimestampTz ts, bool hires)
119119
fsec_tfsec;
120120
constchar*tzn;
121121

122-
timestamp2tm(ts,&tz,&dt,&fsec,&tzn,NULL );
122+
timestamp2tm(ts,&tz,&dt,&fsec,&tzn,NULL );
123123
sprintf(str,"%04d-%02d-%02d %02d:%02d:%02d",dt.tm_year ,dt.tm_mon,
124124
dt.tm_mday,dt.tm_hour,dt.tm_min,dt.tm_sec);
125125
if(!hires)str[16]=0;
@@ -171,14 +171,14 @@ char *set_schema(const char *name, bool get_old)
171171
boolfree_name= false;
172172

173173
if(get_old)
174-
current=_copy_string((char*)GetConfigOption("search_path", true, false));
174+
current=_mcopy_string(NULL,(char*)GetConfigOption("search_path", true, false));
175175
if(name)
176176
{
177177
schema_name= (char*)name;
178178
}
179179
else
180180
{
181-
schema_name=_copy_string((char*)GetConfigOption("schedule.schema", true, false));
181+
schema_name=_mcopy_string(NULL,(char*)GetConfigOption("schedule.schema", true, false));
182182
free_name= true;
183183
}
184184
SetConfigOption("search_path",schema_name,PGC_USERSET,PGC_S_SESSION);
@@ -264,7 +264,7 @@ char_array_t *readBasesToCheck(void)
264264
{
265265
appendStringInfo(&sql,"'%s'",names->data[i]);
266266
if(i+1!=names->n)appendStringInfo(&sql,",");
267-
}
267+
}
268268
destroyCharArray(names);
269269
appendStringInfo(&sql,")");
270270

@@ -304,7 +304,7 @@ void parent_scheduler_main(Datum arg)
304304
CurrentResourceOwner=ResourceOwnerCreate(NULL,"pgpro_scheduler");
305305

306306
init_worker_mem_ctx("Parent scheduler context");
307-
elog(LOG,"Start PostgresPro scheduler.");
307+
elog(LOG,"Start PostgresPro scheduler.");
308308

309309
SetConfigOption("application_name","pgp-s supervisor",PGC_USERSET,PGC_S_SESSION);
310310
pgstat_report_activity(STATE_RUNNING,"Initialize");
@@ -358,7 +358,7 @@ void parent_scheduler_main(Datum arg)
358358
destroyCharArray(names);
359359
}
360360
}
361-
else
361+
else
362362
{
363363
for(i=0;i<pool->n;i++)
364364
{
@@ -417,7 +417,7 @@ pg_scheduler_startup(void)
417417
memcpy(worker.bgw_library_name,"pgpro_scheduler",16);
418418
memcpy(worker.bgw_name,"pgpro scheduler",16);
419419

420-
RegisterBackgroundWorker(&worker);
420+
RegisterBackgroundWorker(&worker);
421421
}
422422

423423
void_PG_init(void)
@@ -468,7 +468,7 @@ void _PG_init(void)
468468
"schedule.transaction_state",
469469
"State of scheduler executor transaction",
470470
"If not under scheduler executor process the variable has no mean and has a value = 'undefined', possible values: progress, success, failure",
471-
&scheduler_transaction_state ,
471+
&scheduler_transaction_state ,
472472
"undefined",
473473
PGC_INTERNAL,
474474
0,
@@ -483,7 +483,7 @@ void _PG_init(void)
483483
&scheduler_max_workers,
484484
2,
485485
1,
486-
100,
486+
1000,
487487
PGC_SUSET,
488488
0,
489489
NULL,
@@ -497,7 +497,7 @@ void _PG_init(void)
497497
&scheduler_max_parallel_workers,
498498
2,
499499
1,
500-
100,
500+
1000,
501501
PGC_SUSET,
502502
0,
503503
NULL,
@@ -542,14 +542,14 @@ cron_string_to_json_text(PG_FUNCTION_ARGS)
542542
text*text_p;
543543
intlen;
544544
char*error=NULL;
545-
545+
546546
if(PG_ARGISNULL(0))
547-
{
547+
{
548548
PG_RETURN_NULL();
549549
}
550550
source=PG_GETARG_CSTRING(0);
551551
jsonText=parse_crontab_to_json_text(source);
552-
552+
553553
if(jsonText)
554554
{
555555
len=strlen(jsonText);

‎contrib/pgpro_scheduler/src/scheduler_executor.c

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ void executor_worker_main(Datum arg)
139139
}
140140
elseif(result<0)
141141
{
142+
if(result==-100)
143+
{
144+
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
145+
"Cannot allocate memory");
146+
shared->worker_exit= true;
147+
shared->status=SchdExecutorError;
148+
}
142149
delete_worker_mem_ctx();
143150
dsm_detach(seg);
144151
proc_exit(0);
@@ -159,9 +166,11 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
159166
{
160167
executor_error_tEE;
161168
char*error=NULL;
162-
inti;
169+
inti,ret;
163170
job_t*job;
164171
spi_response_t*r;
172+
MemoryContextold,mem;
173+
charbuffer[1024];
165174

166175
EE.n=0;
167176
EE.errors=NULL;
@@ -174,6 +183,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
174183
return0;
175184
}
176185

186+
mem=init_mem_ctx("executor");
187+
old=MemoryContextSwitchTo(mem);
188+
177189
*status=shared->status=SchdExecutorWork;
178190
shared->message[0]=0;
179191

@@ -187,6 +199,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
187199
shared->worker_exit= true;
188200
*status=shared->status=SchdExecutorError;
189201

202+
MemoryContextSwitchTo(old);
203+
MemoryContextDelete(mem);
204+
190205
return-1;
191206
}
192207
current_job_id=job->cron_id;
@@ -207,6 +222,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
207222
}
208223
*status=shared->worker_exit= true;
209224
shared->status=SchdExecutorError;
225+
MemoryContextSwitchTo(old);
226+
MemoryContextDelete(mem);
210227
return-2;
211228
}
212229

@@ -230,30 +247,35 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
230247
}
231248
if(job->type==AtJob&&i==0&&job->sql_params_n>0)
232249
{
233-
r=execute_spi_params_prepared(job->dosql[i],job->sql_params_n,job->sql_params);
250+
r=execute_spi_params_prepared(mem,job->dosql[i],job->sql_params_n,job->sql_params);
234251
}
235252
else
236253
{
237-
r=execute_spi(job->dosql[i]);
254+
r=execute_spi(mem,job->dosql[i]);
238255
}
256+
snprintf(buffer,1024,"finalize: %s",job->dosql[i]);
257+
if(!r)return-100;/* cannot allocate memory */
258+
pgstat_report_activity(STATE_RUNNING,buffer);
239259
if(r->retval<0)
240260
{
241261
/* success = false; */
242262
*status=SchdExecutorError;
243263
if(r->error)
244264
{
245-
push_executor_error(&EE,"error in command #%d: %s",
265+
ret=push_executor_error(&EE,"error in command #%d: %s",
246266
i+1,r->error);
247267
}
248268
else
249269
{
250-
push_executor_error(&EE,"error in command #%d: code: %d",
270+
ret=push_executor_error(&EE,"error in command #%d: code: %d",
251271
i+1,r->retval);
252272
}
273+
if(ret<0)return-100;/* cannot alloc memory */
253274
destroy_spi_data(r);
254275
ABORT_SPI_SNAP();
255276
SetConfigOption("schedule.transaction_state","failure",PGC_INTERNAL,PGC_S_SESSION);
256-
executor_onrollback(job,&EE);
277+
if(executor_onrollback(mem,job,&EE)==-14000)
278+
return-100;/* cannot alloc memory */
257279

258280
break;
259281
}
@@ -321,6 +343,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
321343

322344
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);
323345
ResetAllOptions();
346+
MemoryContextSwitchTo(old);
347+
MemoryContextDelete(mem);
324348

325349
return1;
326350
}
@@ -336,23 +360,24 @@ int set_session_authorization(char *username, char **error)
336360
intrv;
337361
char*sql="select oid, rolsuper from pg_catalog.pg_roles where rolname = $1";
338362
charbuff[1024];
363+
MemoryContextmem=CurrentMemoryContext;
339364

340365
values[0]=CStringGetTextDatum(username);
341366
START_SPI_SNAP();
342-
r=execute_spi_sql_with_args(sql,1,types,values,NULL);
367+
r=execute_spi_sql_with_args(mem,sql,1,types,values,NULL);
343368

344369
if(r->retval<0)
345370
{
346371
rv=r->retval;
347-
*error=_copy_string(r->error);
372+
*error=_mcopy_string(mem,r->error);
348373
destroy_spi_data(r);
349374
returnrv;
350375
}
351376
if(r->n_rows==0)
352377
{
353378
STOP_SPI_SNAP();
354379
sprintf(buff,"Cannot find user with name: %s",username);
355-
*error=_copy_string(buff);
380+
*error=_mcopy_string(mem,buff);
356381
destroy_spi_data(r);
357382

358383
return-200;
@@ -415,7 +440,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
415440

416441
START_SPI_SNAP();
417442
pgstat_report_activity(STATE_RUNNING,"culc next time execution time");
418-
r=execute_spi(sql);
443+
r=execute_spi(CurrentMemoryContext,sql);
419444
if(r->retval<0)
420445
{
421446
if(r->error)
@@ -460,7 +485,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
460485
returnts;
461486
}
462487

463-
intexecutor_onrollback(job_t*job,executor_error_t*ee)
488+
intexecutor_onrollback(MemoryContextmem,job_t*job,executor_error_t*ee)
464489
{
465490
intrv;
466491
spi_response_t*r;
@@ -469,16 +494,18 @@ int executor_onrollback(job_t *job, executor_error_t *ee)
469494
pgstat_report_activity(STATE_RUNNING,"execure onrollback");
470495

471496
START_SPI_SNAP();
472-
r=execute_spi(job->onrollback);
497+
r=execute_spi(mem,job->onrollback);
473498
if(r->retval<0)
474499
{
475500
if(r->error)
476501
{
477-
push_executor_error(ee,"onrollback error: %s",r->error);
502+
if(push_executor_error(ee,"onrollback error: %s",r->error)<0)
503+
return-14000;
478504
}
479505
else
480506
{
481-
push_executor_error(ee,"onrollback error: unknown: %d",r->retval);
507+
if(push_executor_error(ee,"onrollback error: unknown: %d",r->retval)<0)
508+
return-14000;
482509
}
483510
ABORT_SPI_SNAP();
484511
}
@@ -502,7 +529,7 @@ void set_pg_var(bool result, executor_error_t *ee)
502529

503530
vals[0]=PointerGetDatum(cstring_to_text(result ?"success":"failure"));
504531

505-
r=execute_spi_sql_with_args(sql,1,argtypes,vals,NULL);
532+
r=execute_spi_sql_with_args(NULL,sql,1,argtypes,vals,NULL);
506533
if(r->retval<0)
507534
{
508535
if(r->error)
@@ -571,6 +598,10 @@ int push_executor_error(executor_error_t *e, char *fmt, ...)
571598
{
572599
e->errors=repalloc(e->errors,sizeof(char*)* (e->n+1));
573600
}
601+
if(e->errors==NULL)
602+
{
603+
return-1;
604+
}
574605
e->errors[e->n]=worker_alloc(sizeof(char)*(len+1));
575606
memcpy(e->errors[e->n],buf,len+1);
576607

@@ -712,14 +743,16 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
712743
intset_ret;
713744
charbuff[512];
714745
spi_response_t*r;
746+
MemoryContextold;
747+
MemoryContextmem=init_mem_ctx("at job processor");
748+
old=MemoryContextSwitchTo(mem);
715749

716750
*status=shared->status=SchdExecutorWork;
717751

718752
pgstat_report_activity(STATE_RUNNING,"initialize at job");
719753
START_SPI_SNAP();
720754

721-
/* job = get_next_at_job_with_lock(shared->nodename, &error); */
722-
job=get_at_job_for_process(shared->nodename,&error);
755+
job=get_at_job_for_process(mem,shared->nodename,&error);
723756
if(!job)
724757
{
725758
if(error)
@@ -765,7 +798,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
765798
return-1;
766799
}
767800
STOP_SPI_SNAP();
768-
elog(LOG,"JOB MOVED TO DONE");
801+
MemoryContextSwitchTo(old);
802+
MemoryContextDelete(mem);
769803
return1;
770804
}
771805

@@ -780,11 +814,11 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
780814

781815
if(job->sql_params_n>0)
782816
{
783-
r=execute_spi_params_prepared(job->dosql[0],job->sql_params_n,job->sql_params);
817+
r=execute_spi_params_prepared(mem,job->dosql[0],job->sql_params_n,job->sql_params);
784818
}
785819
else
786820
{
787-
r=execute_spi(job->dosql[0]);
821+
r=execute_spi(mem,job->dosql[0]);
788822
}
789823
if(job->timelimit)
790824
{
@@ -819,6 +853,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
819853
if(set_ret>0)
820854
{
821855
STOP_SPI_SNAP();
856+
MemoryContextSwitchTo(old);
857+
MemoryContextDelete(mem);
822858
return1;
823859
}
824860
if(set_error)
@@ -831,6 +867,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
831867
elog(LOG,"AT_EXECUTOR ERROR: set log: unknown error");
832868
}
833869
ABORT_SPI_SNAP();
870+
MemoryContextSwitchTo(old);
871+
MemoryContextDelete(mem);
834872

835873
return-1;
836874
}
@@ -846,7 +884,7 @@ Oid set_session_authorization_by_name(char *rolename, char **error)
846884
if(!HeapTupleIsValid(roleTup))
847885
{
848886
snprintf(buffer,512,"There is no user name: %s",rolename);
849-
*error=_copy_string(buffer);
887+
*error=_mcopy_string(NULL,buffer);
850888
returnInvalidOid;
851889
}
852890
rform= (Form_pg_authid)GETSTRUCT(roleTup);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp