Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0bafb66

Browse files
committed
Encoding problem
1 parent5a1aeec commit0bafb66

File tree

2 files changed

+150
-56
lines changed

2 files changed

+150
-56
lines changed

‎exec_plan.c

Lines changed: 149 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include"nodes/params.h"
1212
#include"optimizer/planner.h"
1313
#include"storage/lmgr.h"
14+
#include"tcop/utility.h"
1415
#include"utils/builtins.h"
1516
#include"utils/guc.h"
1617
#include"utils/plancache.h"
@@ -22,25 +23,33 @@ PG_FUNCTION_INFO_V1(pg_execute_plan);
2223

2324
void_PG_init(void);
2425

26+
staticProcessUtility_hook_typenext_ProcessUtility_hook=NULL;
2527
staticplanner_hook_typeprev_planner_hook=NULL;
28+
staticExecutorStart_hook_typeprev_ExecutorStart=NULL;
2629
staticExecutorEnd_hook_typeprev_ExecutorEnd=NULL;
2730

31+
staticvoidHOOK_Utility_injection(PlannedStmt*pstmt,constchar*queryString,
32+
ProcessUtilityContextcontext,ParamListInfoparams,
33+
QueryEnvironment*queryEnv,DestReceiver*dest,
34+
char*completionTag);
2835
staticPlannedStmt*HOOK_Planner_injection(Query*parse,intcursorOptions,
2936
ParamListInfoboundParams);
37+
staticvoidHOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags);
3038
staticvoidHOOK_ExecEnd_injection(QueryDesc*queryDesc);
3139
staticchar*serialize_plan(PlannedStmt*pstmt,ParamListInfoboundParams,
32-
int*size);
40+
constchar*querySourceText);
41+
staticvoidexecute_query(char*planString);
42+
3343
staticPGconn*conn=NULL;
3444

3545
intnode_number1=0;
36-
//#include "utils/guc.h"
46+
3747
/*
3848
* Module load/unload callback
3949
*/
4050
void
4151
_PG_init(void)
4252
{
43-
elog(LOG,"_PG_Init");
4453
DefineCustomIntVariable("pargres.node",
4554
"Node number in instances collaboration",
4655
NULL,
@@ -54,28 +63,90 @@ _PG_init(void)
5463
NULL,
5564
NULL);
5665

66+
/* ProcessUtility hook */
67+
next_ProcessUtility_hook=ProcessUtility_hook;
68+
ProcessUtility_hook=HOOK_Utility_injection;
69+
5770
/* Planner hook */
5871
prev_planner_hook=planner_hook;
5972
planner_hook=HOOK_Planner_injection;
6073

74+
prev_ExecutorStart=ExecutorStart_hook;
75+
ExecutorStart_hook=HOOK_ExecStart_injection;
76+
6177
prev_ExecutorEnd=ExecutorEnd_hook;
6278
ExecutorEnd_hook=HOOK_ExecEnd_injection;
6379
}
6480

6581
staticvoid
66-
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
82+
HOOK_Utility_injection(PlannedStmt*pstmt,
83+
constchar*queryString,
84+
ProcessUtilityContextcontext,
85+
ParamListInfoparams,
86+
QueryEnvironment*queryEnv,
87+
DestReceiver*dest,
88+
char*completionTag)
6789
{
68-
PGresult*result;
90+
Node*parsetree=pstmt->utilityStmt;
91+
92+
if ((OidIsValid(get_extension_oid("execplan", true)))&&
93+
(node_number1==0)&&
94+
(nodeTag(parsetree)==T_CreateStmt))
95+
{
96+
charconninfo[1024];
97+
intstatus;
98+
99+
elog(LOG,"Send UTILITY query: %s",queryString);
100+
101+
/* Connect to slave and send it a query plan */
102+
sprintf(conninfo,"host=localhost port=5433%c",'\0');
103+
conn=PQconnectdb(conninfo);
104+
if (PQstatus(conn)==CONNECTION_BAD)
105+
elog(LOG,"Connection error. conninfo: %s",conninfo);
106+
107+
status=PQsendQuery(conn,queryString);
108+
if (status==0)
109+
elog(ERROR,"Query sending error: %s",PQerrorMessage(conn));
110+
}
111+
112+
if (next_ProcessUtility_hook)
113+
(*next_ProcessUtility_hook) (pstmt,queryString,context,params,
114+
queryEnv,dest,completionTag);
115+
else
116+
standard_ProcessUtility(pstmt,queryString,
117+
context,params,queryEnv,
118+
dest,completionTag);
69119

70-
/* Execute before hook because it destruct memory context of exchange list */
71120
if (conn)
121+
{
122+
PGresult*result;
123+
72124
while ((result=PQgetResult(conn))!=NULL)
73125
Assert(PQresultStatus(result)!=PGRES_FATAL_ERROR);
126+
PQfinish(conn);
127+
conn=NULL;
128+
}
129+
}
74130

75-
if (prev_ExecutorEnd)
76-
prev_ExecutorEnd(queryDesc);
77-
else
78-
standard_ExecutorEnd(queryDesc);
131+
staticvoid
132+
execute_query(char*planString)
133+
{
134+
charconninfo[1024];
135+
char*SQLCommand;
136+
intstatus;
137+
138+
/* Connect to slave and send it a query plan */
139+
sprintf(conninfo,"host=localhost port=5433%c",'\0');
140+
conn=PQconnectdb(conninfo);
141+
if (PQstatus(conn)==CONNECTION_BAD)
142+
elog(LOG,"Connection error. conninfo: %s",conninfo);
143+
144+
SQLCommand= (char*)palloc0(strlen(planString)+100);
145+
sprintf(SQLCommand,"SELECT pg_execute_plan('%s')",planString);
146+
//elog(LOG, "query: %s", SQLCommand);
147+
status=PQsendQuery(conn,SQLCommand);
148+
if (status==0)
149+
elog(ERROR,"Query sending error: %s",PQerrorMessage(conn));
79150
}
80151

81152
staticPlannedStmt*
@@ -91,72 +162,103 @@ HOOK_Planner_injection(Query *parse, int cursorOptions,
91162
else
92163
pstmt=standard_planner(parse,cursorOptions,boundParams);
93164

94-
if (node_number1>0)
165+
if ((node_number1>0)|| (parse->utilityStmt!=NULL))
95166
returnpstmt;
96-
else
97-
printf("SEND Query\n");
98167

99168
/* Extension is not initialized. */
100169
if (OidIsValid(get_extension_oid("execplan", true)))
101170
{
102-
charconninfo[1024];
103-
char*data,
104-
*SQLCommand;
105-
intstatus,
106-
data_size;
107171

108-
/* Connect to slave and send it a query plan */
109-
sprintf(conninfo,"host=localhost port=5433%c",'\0');
110-
conn=PQconnectdb(conninfo);
111-
if (PQstatus(conn)==CONNECTION_BAD)
112-
elog(LOG,"Connection error. conninfo: %s",conninfo);
113-
114-
data=serialize_plan(pstmt,boundParams,&data_size);
115-
SQLCommand= (char*)palloc0(strlen(data)+100);
116-
sprintf(SQLCommand,"SELECT pg_execute_plan('%s')",data);
117-
elog(LOG,"query: %s",SQLCommand);
118-
status=PQsendQuery(conn,SQLCommand);
119-
if (status==0)
120-
elog(ERROR,"Query sending error: %s",PQerrorMessage(conn));
121172
}
122173
returnpstmt;
123174
}
124175

176+
staticvoid
177+
HOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags)
178+
{
179+
Node*parsetree=queryDesc->plannedstmt->utilityStmt;
180+
181+
if ((OidIsValid(get_extension_oid("execplan", true)))&&
182+
(node_number1==0)&&
183+
((parsetree==NULL)|| (nodeTag(parsetree)!=T_CreatedbStmt)))
184+
{
185+
elog(LOG,"Send query: %s",queryDesc->sourceText);
186+
execute_query(serialize_plan(queryDesc->plannedstmt,queryDesc->params,
187+
queryDesc->sourceText));
188+
}
189+
else
190+
{
191+
//elog(LOG, "EXECUTOR Process query without sending. IsParsetree=%hhu node_number1=%d IsExt=%hhu", parsetree != NULL, node_number1, OidIsValid(get_extension_oid("execplan", true)));
192+
}
193+
194+
if (prev_ExecutorStart)
195+
prev_ExecutorStart(queryDesc,eflags);
196+
else
197+
standard_ExecutorStart(queryDesc,eflags);
198+
}
199+
200+
staticvoid
201+
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
202+
{
203+
/* Execute before hook because it destruct memory context of exchange list */
204+
if (conn)
205+
{
206+
PGresult*result;
207+
208+
while ((result=PQgetResult(conn))!=NULL)
209+
Assert(PQresultStatus(result)!=PGRES_FATAL_ERROR);
210+
PQfinish(conn);
211+
conn=NULL;
212+
}
213+
214+
if (prev_ExecutorEnd)
215+
prev_ExecutorEnd(queryDesc);
216+
else
217+
standard_ExecutorEnd(queryDesc);
218+
}
219+
125220
#include"utils/fmgrprotos.h"
126221

127222
staticchar*
128-
serialize_plan(PlannedStmt*pstmt,ParamListInfoboundParams,int*size)
223+
serialize_plan(PlannedStmt*pstmt,ParamListInfoboundParams,
224+
constchar*querySourceText)
129225
{
130226
intsplan_len,
131227
sparams_len,
228+
qtext_len,
132229
econtainer_len;
133230
char*serialized_plan,
134231
*container,
135232
*start_address,
136233
*econtainer;
137234

138-
Assert(size!=NULL);
139-
140235
serialized_plan=nodeToString(pstmt);
141236

142-
/* We usesplan_len+1 bytes for include end-of-string symbol. */
237+
/* We uselen+1 bytes for include end-of-string symbol. */
143238
splan_len=strlen(serialized_plan)+1;
239+
qtext_len=strlen(querySourceText)+1;
144240

145241
sparams_len=EstimateParamListSpace(boundParams);
146242

147-
container= (char*)palloc0(splan_len+sparams_len);
243+
container= (char*)palloc0(splan_len+sparams_len+qtext_len);
148244
//elog(LOG, "Serialize sizes: plan: %d params: %d, numParams: %d", splan_len, sparams_len, boundParams->numParams);
149245
memcpy(container,serialized_plan,splan_len);
150-
start_address=container+splan_len;
246+
start_address=container+splan_len;
151247
SerializeParamList(boundParams,&start_address);
152248

153-
econtainer_len=esc_enc_len(container,splan_len+sparams_len);
154-
econtainer= (char*)palloc0(econtainer_len+1);
249+
Assert(start_address==container+splan_len+sparams_len);
250+
memcpy(start_address,querySourceText,qtext_len);
155251

156-
Assert(econtainer_len==esc_encode(container,splan_len+sparams_len,econtainer));
252+
econtainer_len=pg_base64_enc_len(container,splan_len+sparams_len+qtext_len);
253+
econtainer= (char*)palloc0(econtainer_len+1);
254+
if (econtainer_len!=pg_base64_encode(container,splan_len+sparams_len+
255+
qtext_len,econtainer))
256+
elog(LOG,"econtainer_len: %d %d",econtainer_len,pg_base64_encode(container,splan_len+sparams_len+
257+
qtext_len,econtainer));
258+
Assert(econtainer_len==pg_base64_encode(container,splan_len+sparams_len+
259+
qtext_len,econtainer));
157260
econtainer[econtainer_len]='\0';
158-
*size=econtainer_len+1;
159-
elog(LOG,"Serialize sizes: econtainer: %d",*size);
261+
160262
returnecontainer;
161263
}
162264

@@ -209,30 +311,22 @@ pg_execute_plan(PG_FUNCTION_ARGS)
209311
char*data=TextDatumGetCString(PG_GETARG_DATUM(0));
210312
PlannedStmt*pstmt;
211313
QueryDesc*queryDesc;
212-
charqueryString[5]="NONE";
314+
char*queryString;
213315
ParamListInfoparamLI=NULL;
214316
intdec_tot_len;
215317
char*dcontainer,
216318
*start_addr;
217319

218-
elog(LOG,"datalen=%lu\n",strlen(data));
219320
/* Compute decoded size of bytea data */
220-
dec_tot_len=esc_dec_len(data,strlen(data));
221-
elog(LOG,"dec_tot_len=%d datalen=%lu\n",dec_tot_len,strlen(data));
321+
dec_tot_len=pg_base64_dec_len(data,strlen(data));
222322
dcontainer= (char*)palloc0(dec_tot_len);
223-
Assert(dec_tot_len==esc_decode(data,strlen(data),dcontainer));
323+
Assert(dec_tot_len==pg_base64_decode(data,strlen(data),dcontainer));
224324

225325
pstmt= (PlannedStmt*)stringToNode((char*)dcontainer);
226-
elog(LOG,"Serialize Plan Size=%lu\n",strlen(dcontainer));
227326
start_addr=dcontainer+strlen(dcontainer)+1;
228327
paramLI=RestoreParamList((char**)&start_addr);
229-
elog(LOG,"Decoded params. numParams: %d\n",paramLI->numParams);
230-
//printf("INCOMING: %s\n", data);
231-
//PG_RETURN_BOOL(true);
232-
/* Execute query plan. Based on execParallel.c ParallelQueryMain() */
233-
//
234-
//ptr += strlen((const char *) ptr);
235-
//
328+
queryString=start_addr;
329+
//elog(LOG, "Decoded query: %s\n", start_addr);
236330

237331
queryDesc=CreateQueryDesc(pstmt,queryString,GetActiveSnapshot(),
238332
InvalidSnapshot,None_Receiver,paramLI,NULL,

‎execplan.control

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#Pargres extension
1+
#Execplan extension
22
comment = 'Execute raw query plan'
33
default_version = '0.1'
44
module_pathname = '$libdir/execplan'

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp