2
2
#include "postgres.h"
3
3
4
4
#include "catalog/namespace.h"
5
- #include "commands/extension.h"
6
- #include "executor/execdesc.h"
7
- #include "executor/executor.h"
8
- #include "fmgr.h"
9
- #include "libpq/libpq.h"
10
- #include "libpq-fe.h"
11
- #include "nodes/params.h"
12
- #include "planwalker.h"
13
- #include "optimizer/planner.h"
5
+ #include "common/base64.h"
6
+ #include "nodes/nodeFuncs.h"
14
7
#include "storage/lmgr.h"
15
- #include "tcop/utility.h"
16
8
#include "utils/builtins.h"
17
- #include "utils/guc.h"
18
9
#include "utils/lsyscache.h"
19
- #include "utils/plancache.h"
20
10
#include "utils/snapmgr.h"
21
11
12
+ #include "exec_plan.h"
13
+ #include "planwalker.h"
14
+
22
15
PG_MODULE_MAGIC ;
23
16
24
17
PG_FUNCTION_INFO_V1 (pg_execute_plan );
25
18
26
- void _PG_init (void );
27
-
28
- static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
29
- static planner_hook_type prev_planner_hook = NULL ;
30
- static ExecutorStart_hook_type prev_ExecutorStart = NULL ;
31
- static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
32
-
33
- static void HOOK_Utility_injection (PlannedStmt * pstmt ,const char * queryString ,
34
- ProcessUtilityContext context ,ParamListInfo params ,
35
- QueryEnvironment * queryEnv ,DestReceiver * dest ,
36
- char * completionTag );
37
- static PlannedStmt * HOOK_Planner_injection (Query * parse ,int cursorOptions ,
38
- ParamListInfo boundParams );
39
- static void HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags );
40
- static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
41
19
static char * serialize_plan (QueryDesc * queryDesc ,int eflags );
42
- static void execute_query (char * planString );
43
20
static bool store_irel_name (Plan * plan ,char * buffer );
44
21
45
- static PGconn * conn = NULL ;
46
-
47
- int node_number1 = 0 ;
48
-
49
- /*
50
- * Module load/unload callback
51
- */
52
- void
53
- _PG_init (void )
54
- {
55
- DefineCustomIntVariable ("pargres.node" ,
56
- "Node number in instances collaboration" ,
57
- NULL ,
58
- & node_number1 ,
59
- 0 ,
60
- 0 ,
61
- 1023 ,
62
- PGC_SIGHUP ,
63
- GUC_NOT_IN_SAMPLE ,
64
- NULL ,
65
- NULL ,
66
- NULL );
67
-
68
- /* ProcessUtility hook */
69
- next_ProcessUtility_hook = ProcessUtility_hook ;
70
- ProcessUtility_hook = HOOK_Utility_injection ;
71
-
72
- /* Planner hook */
73
- prev_planner_hook = planner_hook ;
74
- planner_hook = HOOK_Planner_injection ;
75
-
76
- prev_ExecutorStart = ExecutorStart_hook ;
77
- ExecutorStart_hook = HOOK_ExecStart_injection ;
78
-
79
- prev_ExecutorEnd = ExecutorEnd_hook ;
80
- ExecutorEnd_hook = HOOK_ExecEnd_injection ;
81
- }
82
-
83
- static void
84
- HOOK_Utility_injection (PlannedStmt * pstmt ,
85
- const char * queryString ,
86
- ProcessUtilityContext context ,
87
- ParamListInfo params ,
88
- QueryEnvironment * queryEnv ,
89
- DestReceiver * dest ,
90
- char * completionTag )
91
- {
92
- Node * parsetree = pstmt -> utilityStmt ;
93
-
94
- if ((OidIsValid (get_extension_oid ("execplan" , true)))&&
95
- (node_number1 == 0 )&&
96
- (nodeTag (parsetree )!= T_CopyStmt )&&
97
- (nodeTag (parsetree )!= T_CreateExtensionStmt )&&
98
- (context != PROCESS_UTILITY_SUBCOMMAND ))
99
- {
100
- char conninfo [1024 ];
101
- int status ;
102
-
103
- //elog(LOG, "Send UTILITY query %d: %s", nodeTag(parsetree), queryString);
104
-
105
- /* Connect to slave and send it a query plan */
106
- sprintf (conninfo ,"host=localhost port=5433%c" ,'\0' );
107
- conn = PQconnectdb (conninfo );
108
- if (PQstatus (conn )== CONNECTION_BAD )
109
- elog (LOG ,"Connection error. conninfo: %s" ,conninfo );
110
-
111
- status = PQsendQuery (conn ,queryString );
112
- if (status == 0 )
113
- elog (ERROR ,"Query sending error: %s" ,PQerrorMessage (conn ));
114
- }
115
- else if (node_number1 == 0 )
116
- elog (LOG ,"UTILITY query without sending: %s" ,queryString );
117
-
118
- if (next_ProcessUtility_hook )
119
- (* next_ProcessUtility_hook ) (pstmt ,queryString ,context ,params ,
120
- queryEnv ,dest ,completionTag );
121
- else
122
- standard_ProcessUtility (pstmt ,queryString ,
123
- context ,params ,queryEnv ,
124
- dest ,completionTag );
125
-
126
- if (conn )
127
- {
128
- PGresult * result ;
129
-
130
- while ((result = PQgetResult (conn ))!= NULL )
131
- Assert (PQresultStatus (result )!= PGRES_FATAL_ERROR );
132
- PQfinish (conn );
133
- conn = NULL ;
134
- }
135
- }
136
-
137
22
/*
138
23
* INPUT: a base64-encoded serialized plan
139
24
*/
140
- static void
141
- execute_query (char * planString )
25
+ void
26
+ execute_query (PGconn * dest , QueryDesc * queryDesc , int eflags )
142
27
{
143
- char conninfo [1024 ];
144
28
char * SQLCommand ;
145
29
int status ;
30
+ char * serializedPlan ;
31
+ PGresult * result ;
146
32
147
- /* Connect to slave and send it a query plan */
148
- sprintf (conninfo ,"host=localhost port=5433%c" ,'\0' );
149
- conn = PQconnectdb (conninfo );
150
- if (PQstatus (conn )== CONNECTION_BAD )
151
- elog (LOG ,"Connection error. conninfo: %s" ,conninfo );
152
-
153
- SQLCommand = (char * )palloc0 (strlen (planString )+ 100 );
154
- sprintf (SQLCommand ,"SELECT pg_execute_plan('%s');" ,planString );
155
- //elog(LOG, "query: %s", SQLCommand);
156
- status = PQsendQuery (conn ,SQLCommand );
157
- if (status == 0 )
158
- elog (ERROR ,"Query sending error: %s" ,PQerrorMessage (conn ));
159
- }
33
+ Assert (dest != NULL );
160
34
161
- static PlannedStmt *
162
- HOOK_Planner_injection (Query * parse ,int cursorOptions ,
163
- ParamListInfo boundParams )
164
- {
165
- PlannedStmt * pstmt ;
166
-
167
- conn = NULL ;
168
-
169
- if (prev_planner_hook )
170
- pstmt = prev_planner_hook (parse ,cursorOptions ,boundParams );
171
- else
172
- pstmt = standard_planner (parse ,cursorOptions ,boundParams );
173
-
174
- if ((node_number1 > 0 )|| (parse -> utilityStmt != NULL ))
175
- return pstmt ;
176
-
177
- /* Extension is not initialized. */
178
- if (OidIsValid (get_extension_oid ("execplan" , true)))
179
- {
180
-
181
- }
182
- return pstmt ;
183
- }
184
-
185
- static void
186
- HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags )
187
- {
188
- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
189
-
190
- if (prev_ExecutorStart )
191
- prev_ExecutorStart (queryDesc ,eflags );
192
- else
193
- standard_ExecutorStart (queryDesc ,eflags );
194
-
195
- if ((OidIsValid (get_extension_oid ("execplan" , true)))&&
196
- (node_number1 == 0 )&&
197
- ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt )))
198
- {
199
- //elog(LOG, "Send query: %s", queryDesc->sourceText);
200
- execute_query (serialize_plan (queryDesc ,eflags ));
201
- }
202
- }
35
+ /*
36
+ * Before send of plan we need to check connection state.
37
+ * If previous query was failed, we get PGRES_FATAL_ERROR.
38
+ */
39
+ while ((result = PQgetResult (dest ))!= NULL );
203
40
204
- static void
205
- HOOK_ExecEnd_injection (QueryDesc * queryDesc )
206
- {
207
- /* Execute before hook because it destruct memory context of exchange list */
208
- if (conn )
209
- {
210
- PGresult * result ;
41
+ serializedPlan = serialize_plan (queryDesc ,eflags );
42
+ /* Connect to slave and send it a query plan */
43
+ SQLCommand = (char * )palloc0 (strlen (serializedPlan )+ 100 );
44
+ sprintf (SQLCommand ,"SELECT pg_execute_plan('%s');" ,serializedPlan );
211
45
212
- while ((result = PQgetResult (conn ))!= NULL )
213
- Assert (PQresultStatus (result )!= PGRES_FATAL_ERROR );
214
- PQfinish (conn );
215
- conn = NULL ;
216
- }
46
+ status = PQsendQuery (dest ,SQLCommand );
217
47
218
- if (prev_ExecutorEnd )
219
- prev_ExecutorEnd (queryDesc );
220
- else
221
- standard_ExecutorEnd (queryDesc );
48
+ if (status == 0 )
49
+ elog (ERROR ,"Query sending error: %s" ,PQerrorMessage (dest ));
222
50
}
223
51
224
- #include "common/base64.h"
225
- #include "nodes/nodeFuncs.h"
226
-
227
52
static bool
228
53
compute_irels_buffer_len (Plan * plan ,int * length )
229
54
{
@@ -258,8 +83,6 @@ compute_irels_buffer_len(Plan *plan, int *length)
258
83
return plan_tree_walker (plan ,compute_irels_buffer_len ,length );
259
84
}
260
85
261
- //#include "nodes/pg_list.h"
262
-
263
86
static char *
264
87
serialize_plan (QueryDesc * queryDesc ,int eflags )
265
88
{
@@ -275,7 +98,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
275
98
* econtainer ,
276
99
* start_address ;
277
100
ListCell * lc ;
278
-
101
+ elog ( LOG , "Send QUERY: %s" , queryDesc -> sourceText );
279
102
serialized_plan = nodeToString (queryDesc -> plannedstmt );
280
103
281
104
/*
@@ -302,11 +125,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
302
125
* to save the relation names in serialized plan.
303
126
*/
304
127
compute_irels_buffer_len (queryDesc -> plannedstmt -> planTree ,& inames_len );
305
- //plan_tree_walker(queryDesc->plannedstmt->planTree,
306
- // compute_irels_buffer_len,
307
- // &inames_len);
308
- //planstate_tree_walker((PlanState *) (queryDesc->planstate), compute_irels_buffer_len, &inames_len);
309
- //elog(LOG, "inames_len=%d", inames_len);
128
+
310
129
/* We use len+1 bytes for include end-of-string symbol. */
311
130
splan_len = strlen (serialized_plan )+ 1 ;
312
131
qtext_len = strlen (queryDesc -> sourceText )+ 1 ;
@@ -353,9 +172,6 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
353
172
}
354
173
}
355
174
store_irel_name ((Plan * ) (queryDesc -> plannedstmt -> planTree ),start_address );
356
- //plan_tree_walker((Plan *) (queryDesc->plannedstmt->planTree),
357
- // store_irel_name,
358
- // start_address);
359
175
360
176
start_address += inames_len ;
361
177
Assert ((start_address - container )== tot_len );
@@ -525,7 +341,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
525
341
526
342
/* Restore query source text string */
527
343
queryString = start_addr ;
528
- // elog(LOG, "queryString : %s", queryString);
344
+ elog (LOG ,"Recv QUERY : %s" ,queryString );
529
345
/* Restore instrument and flags */
530
346
start_addr += strlen (queryString )+ 1 ;
531
347
instrument_options = (int * )start_addr ;
@@ -544,7 +360,6 @@ pg_execute_plan(PG_FUNCTION_ARGS)
544
360
{
545
361
rte -> relid = RelnameGetRelid (start_addr );
546
362
Assert (rte -> relid != InvalidOid );
547
- //elog(LOG, "Relation from decoded plan. relid=%d relname=%s", rte->relid, start_addr);
548
363
start_addr += strlen (start_addr )+ 1 ;
549
364
}
550
365
}
@@ -565,7 +380,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
565
380
ExecutorFinish (queryDesc );
566
381
ExecutorEnd (queryDesc );
567
382
FreeQueryDesc (queryDesc );
568
-
383
+ //elog(LOG, "End of QUERY: %s", queryString);
569
384
pfree (decdata );
570
385
PG_RETURN_BOOL (true);
571
386
}