Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf234df2

Browse files
committed
Add repeater
1 parentdc96188 commitf234df2

File tree

5 files changed

+229
-212
lines changed

5 files changed

+229
-212
lines changed

‎Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ EXTENSION = execplan
55
EXTVERSION = 0.1
66
PGFILEDESC = "ExecPlan"
77
MODULES = execplan
8-
OBJS = exec_plan.o planwalker.o$(WIN32RES)
8+
OBJS = exec_plan.o planwalker.orepeater.o$(WIN32RES)
99

1010
PG_CPPFLAGS = -I$(libpq_srcdir)
1111
SHLIB_LINK_INTERNAL =$(libpq)

‎exec_plan.c

Lines changed: 26 additions & 211 deletions
Original file line numberDiff line numberDiff line change
@@ -2,228 +2,53 @@
22
#include"postgres.h"
33

44
#include"catalog/namespace.h"
5-
#include"commands/extension.h"
6-
#include"executor/execdesc.h"
7-
#include"executor/executor.h"
8-
#include"fmgr.h"
9-
#include"libpq/libpq.h"
10-
#include"libpq-fe.h"
11-
#include"nodes/params.h"
12-
#include"planwalker.h"
13-
#include"optimizer/planner.h"
5+
#include"common/base64.h"
6+
#include"nodes/nodeFuncs.h"
147
#include"storage/lmgr.h"
15-
#include"tcop/utility.h"
168
#include"utils/builtins.h"
17-
#include"utils/guc.h"
189
#include"utils/lsyscache.h"
19-
#include"utils/plancache.h"
2010
#include"utils/snapmgr.h"
2111

12+
#include"exec_plan.h"
13+
#include"planwalker.h"
14+
2215
PG_MODULE_MAGIC;
2316

2417
PG_FUNCTION_INFO_V1(pg_execute_plan);
2518

26-
void_PG_init(void);
27-
28-
staticProcessUtility_hook_typenext_ProcessUtility_hook=NULL;
29-
staticplanner_hook_typeprev_planner_hook=NULL;
30-
staticExecutorStart_hook_typeprev_ExecutorStart=NULL;
31-
staticExecutorEnd_hook_typeprev_ExecutorEnd=NULL;
32-
33-
staticvoidHOOK_Utility_injection(PlannedStmt*pstmt,constchar*queryString,
34-
ProcessUtilityContextcontext,ParamListInfoparams,
35-
QueryEnvironment*queryEnv,DestReceiver*dest,
36-
char*completionTag);
37-
staticPlannedStmt*HOOK_Planner_injection(Query*parse,intcursorOptions,
38-
ParamListInfoboundParams);
39-
staticvoidHOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags);
40-
staticvoidHOOK_ExecEnd_injection(QueryDesc*queryDesc);
4119
staticchar*serialize_plan(QueryDesc*queryDesc,inteflags);
42-
staticvoidexecute_query(char*planString);
4320
staticboolstore_irel_name(Plan*plan,char*buffer);
4421

45-
staticPGconn*conn=NULL;
46-
47-
intnode_number1=0;
48-
49-
/*
50-
* Module load/unload callback
51-
*/
52-
void
53-
_PG_init(void)
54-
{
55-
DefineCustomIntVariable("pargres.node",
56-
"Node number in instances collaboration",
57-
NULL,
58-
&node_number1,
59-
0,
60-
0,
61-
1023,
62-
PGC_SIGHUP,
63-
GUC_NOT_IN_SAMPLE,
64-
NULL,
65-
NULL,
66-
NULL);
67-
68-
/* ProcessUtility hook */
69-
next_ProcessUtility_hook=ProcessUtility_hook;
70-
ProcessUtility_hook=HOOK_Utility_injection;
71-
72-
/* Planner hook */
73-
prev_planner_hook=planner_hook;
74-
planner_hook=HOOK_Planner_injection;
75-
76-
prev_ExecutorStart=ExecutorStart_hook;
77-
ExecutorStart_hook=HOOK_ExecStart_injection;
78-
79-
prev_ExecutorEnd=ExecutorEnd_hook;
80-
ExecutorEnd_hook=HOOK_ExecEnd_injection;
81-
}
82-
83-
staticvoid
84-
HOOK_Utility_injection(PlannedStmt*pstmt,
85-
constchar*queryString,
86-
ProcessUtilityContextcontext,
87-
ParamListInfoparams,
88-
QueryEnvironment*queryEnv,
89-
DestReceiver*dest,
90-
char*completionTag)
91-
{
92-
Node*parsetree=pstmt->utilityStmt;
93-
94-
if ((OidIsValid(get_extension_oid("execplan", true)))&&
95-
(node_number1==0)&&
96-
(nodeTag(parsetree)!=T_CopyStmt)&&
97-
(nodeTag(parsetree)!=T_CreateExtensionStmt)&&
98-
(context!=PROCESS_UTILITY_SUBCOMMAND))
99-
{
100-
charconninfo[1024];
101-
intstatus;
102-
103-
//elog(LOG, "Send UTILITY query %d: %s", nodeTag(parsetree), queryString);
104-
105-
/* Connect to slave and send it a query plan */
106-
sprintf(conninfo,"host=localhost port=5433%c",'\0');
107-
conn=PQconnectdb(conninfo);
108-
if (PQstatus(conn)==CONNECTION_BAD)
109-
elog(LOG,"Connection error. conninfo: %s",conninfo);
110-
111-
status=PQsendQuery(conn,queryString);
112-
if (status==0)
113-
elog(ERROR,"Query sending error: %s",PQerrorMessage(conn));
114-
}
115-
elseif (node_number1==0)
116-
elog(LOG,"UTILITY query without sending: %s",queryString);
117-
118-
if (next_ProcessUtility_hook)
119-
(*next_ProcessUtility_hook) (pstmt,queryString,context,params,
120-
queryEnv,dest,completionTag);
121-
else
122-
standard_ProcessUtility(pstmt,queryString,
123-
context,params,queryEnv,
124-
dest,completionTag);
125-
126-
if (conn)
127-
{
128-
PGresult*result;
129-
130-
while ((result=PQgetResult(conn))!=NULL)
131-
Assert(PQresultStatus(result)!=PGRES_FATAL_ERROR);
132-
PQfinish(conn);
133-
conn=NULL;
134-
}
135-
}
136-
13722
/*
13823
* INPUT: a base64-encoded serialized plan
13924
*/
140-
staticvoid
141-
execute_query(char*planString)
25+
void
26+
execute_query(PGconn*dest,QueryDesc*queryDesc,inteflags)
14227
{
143-
charconninfo[1024];
14428
char*SQLCommand;
14529
intstatus;
30+
char*serializedPlan;
31+
PGresult*result;
14632

147-
/* Connect to slave and send it a query plan */
148-
sprintf(conninfo,"host=localhost port=5433%c",'\0');
149-
conn=PQconnectdb(conninfo);
150-
if (PQstatus(conn)==CONNECTION_BAD)
151-
elog(LOG,"Connection error. conninfo: %s",conninfo);
152-
153-
SQLCommand= (char*)palloc0(strlen(planString)+100);
154-
sprintf(SQLCommand,"SELECT pg_execute_plan('%s');",planString);
155-
//elog(LOG, "query: %s", SQLCommand);
156-
status=PQsendQuery(conn,SQLCommand);
157-
if (status==0)
158-
elog(ERROR,"Query sending error: %s",PQerrorMessage(conn));
159-
}
33+
Assert(dest!=NULL);
16034

161-
staticPlannedStmt*
162-
HOOK_Planner_injection(Query*parse,intcursorOptions,
163-
ParamListInfoboundParams)
164-
{
165-
PlannedStmt*pstmt;
166-
167-
conn=NULL;
168-
169-
if (prev_planner_hook)
170-
pstmt=prev_planner_hook(parse,cursorOptions,boundParams);
171-
else
172-
pstmt=standard_planner(parse,cursorOptions,boundParams);
173-
174-
if ((node_number1>0)|| (parse->utilityStmt!=NULL))
175-
returnpstmt;
176-
177-
/* Extension is not initialized. */
178-
if (OidIsValid(get_extension_oid("execplan", true)))
179-
{
180-
181-
}
182-
returnpstmt;
183-
}
184-
185-
staticvoid
186-
HOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags)
187-
{
188-
Node*parsetree=queryDesc->plannedstmt->utilityStmt;
189-
190-
if (prev_ExecutorStart)
191-
prev_ExecutorStart(queryDesc,eflags);
192-
else
193-
standard_ExecutorStart(queryDesc,eflags);
194-
195-
if ((OidIsValid(get_extension_oid("execplan", true)))&&
196-
(node_number1==0)&&
197-
((parsetree==NULL)|| (nodeTag(parsetree)!=T_CreatedbStmt)))
198-
{
199-
//elog(LOG, "Send query: %s", queryDesc->sourceText);
200-
execute_query(serialize_plan(queryDesc,eflags));
201-
}
202-
}
35+
/*
36+
* Before send of plan we need to check connection state.
37+
* If previous query was failed, we get PGRES_FATAL_ERROR.
38+
*/
39+
while ((result=PQgetResult(dest))!=NULL);
20340

204-
staticvoid
205-
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
206-
{
207-
/* Execute before hook because it destruct memory context of exchange list */
208-
if (conn)
209-
{
210-
PGresult*result;
41+
serializedPlan=serialize_plan(queryDesc,eflags);
42+
/* Connect to slave and send it a query plan */
43+
SQLCommand= (char*)palloc0(strlen(serializedPlan)+100);
44+
sprintf(SQLCommand,"SELECT pg_execute_plan('%s');",serializedPlan);
21145

212-
while ((result=PQgetResult(conn))!=NULL)
213-
Assert(PQresultStatus(result)!=PGRES_FATAL_ERROR);
214-
PQfinish(conn);
215-
conn=NULL;
216-
}
46+
status=PQsendQuery(dest,SQLCommand);
21747

218-
if (prev_ExecutorEnd)
219-
prev_ExecutorEnd(queryDesc);
220-
else
221-
standard_ExecutorEnd(queryDesc);
48+
if (status==0)
49+
elog(ERROR,"Query sending error: %s",PQerrorMessage(dest));
22250
}
22351

224-
#include"common/base64.h"
225-
#include"nodes/nodeFuncs.h"
226-
22752
staticbool
22853
compute_irels_buffer_len(Plan*plan,int*length)
22954
{
@@ -258,8 +83,6 @@ compute_irels_buffer_len(Plan *plan, int *length)
25883
returnplan_tree_walker(plan,compute_irels_buffer_len,length);
25984
}
26085

261-
//#include "nodes/pg_list.h"
262-
26386
staticchar*
26487
serialize_plan(QueryDesc*queryDesc,inteflags)
26588
{
@@ -275,7 +98,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
27598
*econtainer,
27699
*start_address;
277100
ListCell*lc;
278-
101+
elog(LOG,"Send QUERY: %s",queryDesc->sourceText);
279102
serialized_plan=nodeToString(queryDesc->plannedstmt);
280103

281104
/*
@@ -302,11 +125,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
302125
* to save the relation names in serialized plan.
303126
*/
304127
compute_irels_buffer_len(queryDesc->plannedstmt->planTree,&inames_len);
305-
//plan_tree_walker(queryDesc->plannedstmt->planTree,
306-
// compute_irels_buffer_len,
307-
// &inames_len);
308-
//planstate_tree_walker((PlanState *) (queryDesc->planstate), compute_irels_buffer_len, &inames_len);
309-
//elog(LOG, "inames_len=%d", inames_len);
128+
310129
/* We use len+1 bytes for include end-of-string symbol. */
311130
splan_len=strlen(serialized_plan)+1;
312131
qtext_len=strlen(queryDesc->sourceText)+1;
@@ -353,9 +172,6 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
353172
}
354173
}
355174
store_irel_name((Plan*) (queryDesc->plannedstmt->planTree),start_address);
356-
//plan_tree_walker((Plan *) (queryDesc->plannedstmt->planTree),
357-
// store_irel_name,
358-
// start_address);
359175

360176
start_address+=inames_len;
361177
Assert((start_address-container)==tot_len);
@@ -525,7 +341,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
525341

526342
/* Restore query source text string */
527343
queryString=start_addr;
528-
//elog(LOG, "queryString: %s", queryString);
344+
elog(LOG,"Recv QUERY: %s",queryString);
529345
/* Restore instrument and flags */
530346
start_addr+=strlen(queryString)+1;
531347
instrument_options= (int*)start_addr;
@@ -544,7 +360,6 @@ pg_execute_plan(PG_FUNCTION_ARGS)
544360
{
545361
rte->relid=RelnameGetRelid(start_addr);
546362
Assert(rte->relid!=InvalidOid);
547-
//elog(LOG, "Relation from decoded plan. relid=%d relname=%s", rte->relid, start_addr);
548363
start_addr+=strlen(start_addr)+1;
549364
}
550365
}
@@ -565,7 +380,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
565380
ExecutorFinish(queryDesc);
566381
ExecutorEnd(queryDesc);
567382
FreeQueryDesc(queryDesc);
568-
383+
//elog(LOG, "End of QUERY: %s", queryString);
569384
pfree(decdata);
570385
PG_RETURN_BOOL(true);
571386
}

‎exec_plan.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* exec_plan.h
3+
*
4+
*/
5+
6+
#ifndefCONTRIB_EXECPLAN_EXEC_PLAN_H_
7+
#defineCONTRIB_EXECPLAN_EXEC_PLAN_H_
8+
9+
#include"postgres.h"
10+
11+
#include"executor/executor.h"
12+
#include"libpq-fe.h"
13+
14+
voidexecute_query(PGconn*dest,QueryDesc*queryDesc,inteflags);
15+
16+
#endif/* CONTRIB_EXECPLAN_EXEC_PLAN_H_ */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp