Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5a1aeec

Browse files
committed
Add params to serialization
1 parenta2f1033 commit5a1aeec

File tree

2 files changed

+145
-29
lines changed

2 files changed

+145
-29
lines changed

‎Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ PGXS := $(shell $(PG_CONFIG) --pgxs)
1818
include$(PGXS)
1919
else
2020
SHLIB_PREREQS = submake-libpq
21-
subdir = contrib/pargres
21+
subdir = contrib/execplan
2222
top_builddir = ../..
2323
include$(top_builddir)/src/Makefile.global
2424
include$(top_srcdir)/contrib/contrib-global.mk

‎exec_plan.c

Lines changed: 144 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11

22
#include"postgres.h"
33

4+
//#include "access/xact.h"
45
#include"commands/extension.h"
56
#include"executor/execdesc.h"
7+
#include"executor/executor.h"
68
#include"fmgr.h"
9+
#include"libpq/libpq.h"
10+
#include"libpq-fe.h"
11+
#include"nodes/params.h"
712
#include"optimizer/planner.h"
813
#include"storage/lmgr.h"
914
#include"utils/builtins.h"
15+
#include"utils/guc.h"
16+
#include"utils/plancache.h"
17+
#include"utils/snapmgr.h"
1018

1119
PG_MODULE_MAGIC;
1220

@@ -15,51 +23,142 @@ PG_FUNCTION_INFO_V1(pg_execute_plan);
1523
void_PG_init(void);
1624

1725
staticplanner_hook_typeprev_planner_hook=NULL;
26+
staticExecutorEnd_hook_typeprev_ExecutorEnd=NULL;
1827

1928
staticPlannedStmt*HOOK_Planner_injection(Query*parse,intcursorOptions,
2029
ParamListInfoboundParams);
30+
staticvoidHOOK_ExecEnd_injection(QueryDesc*queryDesc);
31+
staticchar*serialize_plan(PlannedStmt*pstmt,ParamListInfoboundParams,
32+
int*size);
33+
staticPGconn*conn=NULL;
2134

35+
intnode_number1=0;
36+
//#include "utils/guc.h"
2237
/*
2338
* Module load/unload callback
2439
*/
2540
void
2641
_PG_init(void)
2742
{
43+
elog(LOG,"_PG_Init");
44+
DefineCustomIntVariable("pargres.node",
45+
"Node number in instances collaboration",
46+
NULL,
47+
&node_number1,
48+
0,
49+
0,
50+
1023,
51+
PGC_SIGHUP,
52+
GUC_NOT_IN_SAMPLE,
53+
NULL,
54+
NULL,
55+
NULL);
56+
2857
/* Planner hook */
2958
prev_planner_hook=planner_hook;
3059
planner_hook=HOOK_Planner_injection;
60+
61+
prev_ExecutorEnd=ExecutorEnd_hook;
62+
ExecutorEnd_hook=HOOK_ExecEnd_injection;
3163
}
3264

33-
staticPlannedStmt*HOOK_Planner_injection(Query*parse,intcursorOptions,
65+
staticvoid
66+
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
67+
{
68+
PGresult*result;
69+
70+
/* Execute before hook because it destruct memory context of exchange list */
71+
if (conn)
72+
while ((result=PQgetResult(conn))!=NULL)
73+
Assert(PQresultStatus(result)!=PGRES_FATAL_ERROR);
74+
75+
if (prev_ExecutorEnd)
76+
prev_ExecutorEnd(queryDesc);
77+
else
78+
standard_ExecutorEnd(queryDesc);
79+
}
80+
81+
staticPlannedStmt*
82+
HOOK_Planner_injection(Query*parse,intcursorOptions,
3483
ParamListInfoboundParams)
3584
{
36-
PlannedStmt*stmt;
37-
char*serialized_plan;
85+
PlannedStmt*pstmt;
86+
87+
conn=NULL;
3888

3989
if (prev_planner_hook)
40-
stmt=prev_planner_hook(parse,cursorOptions,boundParams);
90+
pstmt=prev_planner_hook(parse,cursorOptions,boundParams);
91+
else
92+
pstmt=standard_planner(parse,cursorOptions,boundParams);
93+
94+
if (node_number1>0)
95+
returnpstmt;
4196
else
42-
stmt=standard_planner(parse,cursorOptions,boundParams);
97+
printf("SEND Query\n");
4398

4499
/* Extension is not initialized. */
45100
if (OidIsValid(get_extension_oid("execplan", true)))
46101
{
47-
FILE*f=fopen("/home/andrey/plans.txt","at");
48-
if (stmt->paramExecTypes==NIL)
49-
{
50-
elog(LOG,"commandType: %d\n",stmt->commandType);
51-
}
52-
//Assert(stmt->paramExecTypes != NIL);
53-
serialized_plan=nodeToString(stmt);
54-
//fprintf(f, "\n%s\n", serialized_plan);
55-
fclose(f);
102+
charconninfo[1024];
103+
char*data,
104+
*SQLCommand;
105+
intstatus,
106+
data_size;
107+
108+
/* Connect to slave and send it a query plan */
109+
sprintf(conninfo,"host=localhost port=5433%c",'\0');
110+
conn=PQconnectdb(conninfo);
111+
if (PQstatus(conn)==CONNECTION_BAD)
112+
elog(LOG,"Connection error. conninfo: %s",conninfo);
113+
114+
data=serialize_plan(pstmt,boundParams,&data_size);
115+
SQLCommand= (char*)palloc0(strlen(data)+100);
116+
sprintf(SQLCommand,"SELECT pg_execute_plan('%s')",data);
117+
elog(LOG,"query: %s",SQLCommand);
118+
status=PQsendQuery(conn,SQLCommand);
119+
if (status==0)
120+
elog(ERROR,"Query sending error: %s",PQerrorMessage(conn));
56121
}
57-
returnstmt;
122+
returnpstmt;
58123
}
59124

60-
#include"executor/executor.h"
61-
#include"utils/plancache.h"
62-
#include"utils/snapmgr.h"
125+
#include"utils/fmgrprotos.h"
126+
127+
staticchar*
128+
serialize_plan(PlannedStmt*pstmt,ParamListInfoboundParams,int*size)
129+
{
130+
intsplan_len,
131+
sparams_len,
132+
econtainer_len;
133+
char*serialized_plan,
134+
*container,
135+
*start_address,
136+
*econtainer;
137+
138+
Assert(size!=NULL);
139+
140+
serialized_plan=nodeToString(pstmt);
141+
142+
/* We use splan_len+1 bytes for include end-of-string symbol. */
143+
splan_len=strlen(serialized_plan)+1;
144+
145+
sparams_len=EstimateParamListSpace(boundParams);
146+
147+
container= (char*)palloc0(splan_len+sparams_len);
148+
//elog(LOG, "Serialize sizes: plan: %d params: %d, numParams: %d", splan_len, sparams_len, boundParams->numParams);
149+
memcpy(container,serialized_plan,splan_len);
150+
start_address=container+splan_len;
151+
SerializeParamList(boundParams,&start_address);
152+
153+
econtainer_len=esc_enc_len(container,splan_len+sparams_len);
154+
econtainer= (char*)palloc0(econtainer_len+1);
155+
156+
Assert(econtainer_len==esc_encode(container,splan_len+sparams_len,econtainer));
157+
econtainer[econtainer_len]='\0';
158+
*size=econtainer_len+1;
159+
elog(LOG,"Serialize sizes: econtainer: %d",*size);
160+
returnecontainer;
161+
}
63162

64163
staticvoid
65164
ScanQueryForLocks(PlannedStmt*pstmt,boolacquire)
@@ -107,17 +206,34 @@ AcquirePlannerLocks(PlannedStmt *pstmt, bool acquire)
107206
Datum
108207
pg_execute_plan(PG_FUNCTION_ARGS)
109208
{
110-
char*data=TextDatumGetCString(PG_GETARG_DATUM(0));
111-
PlannedStmt*pstmt;
112-
QueryDesc*queryDesc;
113-
charqueryString[5]="NONE";
114-
ParamListInfoparamLI=NULL;
115-
116-
elog(INFO,"MESSAGE: %s",data);
117-
209+
char*data=TextDatumGetCString(PG_GETARG_DATUM(0));
210+
PlannedStmt*pstmt;
211+
QueryDesc*queryDesc;
212+
charqueryString[5]="NONE";
213+
ParamListInfoparamLI=NULL;
214+
intdec_tot_len;
215+
char*dcontainer,
216+
*start_addr;
217+
218+
elog(LOG,"datalen=%lu\n",strlen(data));
219+
/* Compute decoded size of bytea data */
220+
dec_tot_len=esc_dec_len(data,strlen(data));
221+
elog(LOG,"dec_tot_len=%d datalen=%lu\n",dec_tot_len,strlen(data));
222+
dcontainer= (char*)palloc0(dec_tot_len);
223+
Assert(dec_tot_len==esc_decode(data,strlen(data),dcontainer));
224+
225+
pstmt= (PlannedStmt*)stringToNode((char*)dcontainer);
226+
elog(LOG,"Serialize Plan Size=%lu\n",strlen(dcontainer));
227+
start_addr=dcontainer+strlen(dcontainer)+1;
228+
paramLI=RestoreParamList((char**)&start_addr);
229+
elog(LOG,"Decoded params. numParams: %d\n",paramLI->numParams);
230+
//printf("INCOMING: %s\n", data);
231+
//PG_RETURN_BOOL(true);
118232
/* Execute query plan. Based on execParallel.c ParallelQueryMain() */
119-
pstmt= (PlannedStmt*)stringToNode(data);
120-
//pstmt->paramExecTypes = NIL;
233+
//
234+
//ptr += strlen((const char *) ptr);
235+
//
236+
121237
queryDesc=CreateQueryDesc(pstmt,queryString,GetActiveSnapshot(),
122238
InvalidSnapshot,None_Receiver,paramLI,NULL,
123239
0);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp