Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1340976

Browse files
author
Vladimir Ershov
committed
fix error messages
set session authorizationfix transaction handling
1 parentfad2e22 commit1340976

File tree

6 files changed

+120
-41
lines changed

6 files changed

+120
-41
lines changed

‎src/scheduler_executor.c‎

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ void executor_worker_main(Datum arg)
6161
boolsuccess= true;
6262
executor_error_tEE;
6363
intret;
64-
char*error;
64+
char*error=NULL;
6565
booluse_pg_vars= true;
66+
schd_executor_status_tstatus;
6667
/* int rc = 0; */
6768

6869
EE.n=0;
@@ -82,7 +83,7 @@ void executor_worker_main(Datum arg)
8283
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8384
errmsg("executor corrupted dynamic shared memory segment")));
8485
}
85-
shared->status=SchdExecutorWork;
86+
status=shared->status=SchdExecutorWork;
8687
SetConfigOption("application_name","pgp-s executor",PGC_USERSET,PGC_S_SESSION);
8788
pgstat_report_activity(STATE_RUNNING,"initialize");
8889
init_worker_mem_ctx("ExecutorMemoryContext");
@@ -92,9 +93,28 @@ void executor_worker_main(Datum arg)
9293
job=initializeExecutorJob(shared);
9394
if(!job)
9495
{
95-
shared->status=SchdExecutorError;
9696
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
9797
"Cannot retrive job information");
98+
shared->status=SchdExecutorError;
99+
delete_worker_mem_ctx();
100+
dsm_detach(seg);
101+
proc_exit(0);
102+
}
103+
104+
if(set_session_authorization(job->executor,&error)<0)
105+
{
106+
if(error)
107+
{
108+
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
109+
"Cannot set session auth: %s",error);
110+
pfree(error);
111+
}
112+
else
113+
{
114+
snprintf(shared->message,PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
115+
"Cannot set session auth: unknown error");
116+
}
117+
shared->status=SchdExecutorError;
98118
delete_worker_mem_ctx();
99119
dsm_detach(seg);
100120
proc_exit(0);
@@ -116,13 +136,15 @@ void executor_worker_main(Datum arg)
116136
{
117137
pgstat_report_activity(STATE_RUNNING,job->dosql[i]);
118138
CHECK_FOR_INTERRUPTS();
119-
if(!job->same_transaction)START_SPI_SNAP();
120-
139+
if(!job->same_transaction)
140+
{
141+
START_SPI_SNAP();
142+
}
121143
ret=execute_spi(job->dosql[i],&error);
122144
if(ret<0)
123145
{
124146
success= false;
125-
shared->status=SchdExecutorError;
147+
status=SchdExecutorError;
126148
if(error)
127149
{
128150
push_executor_error(&EE,"error on %d: %s",i+1,error);
@@ -139,13 +161,19 @@ void executor_worker_main(Datum arg)
139161
}
140162
else
141163
{
142-
if(!job->same_transaction)STOP_SPI_SNAP();
164+
if(!job->same_transaction)
165+
{
166+
STOP_SPI_SNAP();
167+
}
143168
}
144169
}
145-
if(shared->status!=SchdExecutorError)
170+
if(status!=SchdExecutorError)
146171
{
147-
if(job->same_transaction)STOP_SPI_SNAP();
148-
shared->status=SchdExecutorDone;
172+
if(job->same_transaction)
173+
{
174+
STOP_SPI_SNAP();
175+
}
176+
status=SchdExecutorDone;
149177
}
150178
if(job->next_time_statement)
151179
{
@@ -161,12 +189,46 @@ void executor_worker_main(Datum arg)
161189
{
162190
set_shared_message(shared,&EE);
163191
}
192+
shared->status=status;
164193

165194
delete_worker_mem_ctx();
166195
dsm_detach(seg);
167196
proc_exit(0);
168197
}
169198

199+
intset_session_authorization(char*username,char**error)
200+
{
201+
Oidtypes[1]= {TEXTOID };
202+
Oiduseroid;
203+
Datumvalues[1];
204+
boolis_superuser;
205+
intret;
206+
char*sql="select usesysid, usesuper from pg_catalog.pg_user where usename = $1";
207+
charbuff[1024];
208+
209+
values[0]=CStringGetTextDatum(username);
210+
START_SPI_SNAP();
211+
ret=execute_spi_sql_with_args(sql,1,types,values,NULL,error);
212+
213+
if(ret<0)returnret;
214+
if(SPI_processed==0)
215+
{
216+
STOP_SPI_SNAP();
217+
sprintf(buff,"Cannot find user with name: %s",username);
218+
*error=_copy_string(buff);
219+
220+
return-200;
221+
}
222+
useroid=get_oid_from_spi(0,1,0);
223+
is_superuser=get_boolean_from_spi(0,2, false);
224+
225+
STOP_SPI_SNAP();
226+
227+
SetSessionAuthorization(useroid,is_superuser);
228+
229+
return1;
230+
}
231+
170232
voidset_shared_message(schd_executor_share_t*shared,executor_error_t*ee)
171233
{
172234
intleft=PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX;

‎src/scheduler_executor.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ typedef enum {
1717
typedefstruct {
1818
chardatabase[PGPRO_SCHEDULER_DBNAME_MAX];
1919
charnodename[PGPRO_SCHEDULER_NODENAME_MAX];
20+
charuser[NAMEDATALEN];
2021

2122
intcron_id;
2223
TimestampTzstart_at;
@@ -41,6 +42,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee);
4142
intexecutor_onrollback(job_t*job,executor_error_t*ee);
4243
voidset_pg_var(boolresulti,executor_error_t*ee);
4344
intpush_executor_error(executor_error_t*e,char*fmt, ...) __attribute__ ((format (gnu_printf,2,3)));
45+
intset_session_authorization(char*username,char**error);
4446

4547

4648
#endif

‎src/scheduler_job.c‎

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include"lib/stringinfo.h"
88
#include"scheduler_spi_utils.h"
99
#include"utils/timestamp.h"
10+
#include"utils/builtins.h"
1011
#include"memutils.h"
1112

1213
job_t*init_scheduler_job(job_t*j)
@@ -22,12 +23,13 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
2223
{
2324
job_t*jobs=NULL;
2425
intret,got,i;
25-
Oidargtypes[1]= {CSTRINGOID };
26+
Oidargtypes[1]= {TEXTOID };
2627
Datumvalues[1];
27-
constchar*get_job_sql="select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instancesfrom schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1::text";
28+
constchar*get_job_sql="select at.start_at, at.last_start_available, at.cron, max_run_time, cron.max_instances, cron.executorfrom schedule.at at, schedule.cron cron where start_at <= 'now' and not at.active and (last_start_available is NULL OR last_start_available > 'now') and at.cron = cron.id AND cron.node = $1 order by at.start_at";
2829

2930
*is_error=*n=0;
30-
values[0]=CStringGetDatum(nodename);
31+
START_SPI_SNAP();
32+
values[0]=CStringGetTextDatum(nodename);
3133
ret=SPI_execute_with_args(get_job_sql,1,argtypes,values,NULL, true,0);
3234
if(ret==SPI_OK_SELECT)
3335
{
@@ -45,13 +47,15 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
4547
jobs[i].timelimit=get_interval_seconds_from_spi(i,4,0);
4648
jobs[i].max_instances=get_int_from_spi(i,5,1);
4749
jobs[i].node=_copy_string(nodename);
50+
jobs[i].executor=get_text_from_spi(i,6);
4851
}
4952
}
5053
}
5154
else
5255
{
5356
*is_error=1;
5457
}
58+
STOP_SPI_SNAP();
5559
returnjobs;
5660
}
5761

@@ -107,8 +111,8 @@ job_t *set_job_error(job_t *j, const char *fmt, ...)
107111
intmove_job_to_log(job_t*j,boolstatus)
108112
{
109113
Datumvalues[4];
110-
charnulls[4]= {0,0,0,0 };
111-
Oidargtypes[4]= {BOOLOID,CSTRINGOID,INT4OID,TIMESTAMPTZOID };
114+
charnulls[4]= {' ',' ',' ',' ' };
115+
Oidargtypes[4]= {BOOLOID,TEXTOID,INT4OID,TIMESTAMPTZOID };
112116
intret;
113117
constchar*del_sql="delete from schedule.at where start_at = $1 and cron = $2";
114118
constchar*sql="insert into schedule.log (start_at, last_start_available, retry, cron, node, started, status, finished, message) SELECT start_at, last_start_available, retry, cron, node, started, $1 as status, 'now'::timestamp as finished, $2 as message from schedule.at where cron = $3 and start_at = $4";
@@ -118,11 +122,11 @@ int move_job_to_log(job_t *j, bool status)
118122
values[0]=BoolGetDatum(status);
119123
if(j->error)
120124
{
121-
values[1]=CStringGetDatum(j->error);
125+
values[1]=CStringGetTextDatum(j->error);
122126
}
123127
else
124128
{
125-
nulls[1]=1;
129+
nulls[1]='n';
126130
}
127131
values[2]=Int32GetDatum(j->cron_id);
128132
values[3]=TimestampTzGetDatum(j->start_at);
@@ -134,7 +138,7 @@ int move_job_to_log(job_t *j, bool status)
134138
argtypes[1]=INT4OID;
135139
values[0]=TimestampTzGetDatum(j->start_at);
136140
values[1]=Int32GetDatum(j->cron_id);
137-
ret=SPI_execute_with_args(del_sql,2,argtypes,values,nulls, false,0);
141+
ret=SPI_execute_with_args(del_sql,2,argtypes,values,NULL, false,0);
138142
if(ret==SPI_OK_DELETE)
139143
{
140144
return1;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp