22#include "postgres.h"
33
44#include "catalog/namespace.h"
5- #include "commands/extension.h"
6- #include "executor/execdesc.h"
7- #include "executor/executor.h"
8- #include "fmgr.h"
9- #include "libpq/libpq.h"
10- #include "libpq-fe.h"
11- #include "nodes/params.h"
12- #include "planwalker.h"
13- #include "optimizer/planner.h"
5+ #include "common/base64.h"
6+ #include "nodes/nodeFuncs.h"
147#include "storage/lmgr.h"
15- #include "tcop/utility.h"
168#include "utils/builtins.h"
17- #include "utils/guc.h"
189#include "utils/lsyscache.h"
19- #include "utils/plancache.h"
2010#include "utils/snapmgr.h"
2111
12+ #include "exec_plan.h"
13+ #include "planwalker.h"
14+
2215PG_MODULE_MAGIC ;
2316
2417PG_FUNCTION_INFO_V1 (pg_execute_plan );
2518
26- void _PG_init (void );
27-
28- static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
29- static planner_hook_type prev_planner_hook = NULL ;
30- static ExecutorStart_hook_type prev_ExecutorStart = NULL ;
31- static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
32-
33- static void HOOK_Utility_injection (PlannedStmt * pstmt ,const char * queryString ,
34- ProcessUtilityContext context ,ParamListInfo params ,
35- QueryEnvironment * queryEnv ,DestReceiver * dest ,
36- char * completionTag );
37- static PlannedStmt * HOOK_Planner_injection (Query * parse ,int cursorOptions ,
38- ParamListInfo boundParams );
39- static void HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags );
40- static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
4119static char * serialize_plan (QueryDesc * queryDesc ,int eflags );
42- static void execute_query (char * planString );
4320static bool store_irel_name (Plan * plan ,char * buffer );
4421
45- static PGconn * conn = NULL ;
46-
47- int node_number1 = 0 ;
48-
49- /*
50- * Module load/unload callback
51- */
52- void
53- _PG_init (void )
54- {
55- DefineCustomIntVariable ("pargres.node" ,
56- "Node number in instances collaboration" ,
57- NULL ,
58- & node_number1 ,
59- 0 ,
60- 0 ,
61- 1023 ,
62- PGC_SIGHUP ,
63- GUC_NOT_IN_SAMPLE ,
64- NULL ,
65- NULL ,
66- NULL );
67-
68- /* ProcessUtility hook */
69- next_ProcessUtility_hook = ProcessUtility_hook ;
70- ProcessUtility_hook = HOOK_Utility_injection ;
71-
72- /* Planner hook */
73- prev_planner_hook = planner_hook ;
74- planner_hook = HOOK_Planner_injection ;
75-
76- prev_ExecutorStart = ExecutorStart_hook ;
77- ExecutorStart_hook = HOOK_ExecStart_injection ;
78-
79- prev_ExecutorEnd = ExecutorEnd_hook ;
80- ExecutorEnd_hook = HOOK_ExecEnd_injection ;
81- }
82-
83- static void
84- HOOK_Utility_injection (PlannedStmt * pstmt ,
85- const char * queryString ,
86- ProcessUtilityContext context ,
87- ParamListInfo params ,
88- QueryEnvironment * queryEnv ,
89- DestReceiver * dest ,
90- char * completionTag )
91- {
92- Node * parsetree = pstmt -> utilityStmt ;
93-
94- if ((OidIsValid (get_extension_oid ("execplan" , true)))&&
95- (node_number1 == 0 )&&
96- (nodeTag (parsetree )!= T_CopyStmt )&&
97- (nodeTag (parsetree )!= T_CreateExtensionStmt )&&
98- (context != PROCESS_UTILITY_SUBCOMMAND ))
99- {
100- char conninfo [1024 ];
101- int status ;
102-
103- //elog(LOG, "Send UTILITY query %d: %s", nodeTag(parsetree), queryString);
104-
105- /* Connect to slave and send it a query plan */
106- sprintf (conninfo ,"host=localhost port=5433%c" ,'\0' );
107- conn = PQconnectdb (conninfo );
108- if (PQstatus (conn )== CONNECTION_BAD )
109- elog (LOG ,"Connection error. conninfo: %s" ,conninfo );
110-
111- status = PQsendQuery (conn ,queryString );
112- if (status == 0 )
113- elog (ERROR ,"Query sending error: %s" ,PQerrorMessage (conn ));
114- }
115- else if (node_number1 == 0 )
116- elog (LOG ,"UTILITY query without sending: %s" ,queryString );
117-
118- if (next_ProcessUtility_hook )
119- (* next_ProcessUtility_hook ) (pstmt ,queryString ,context ,params ,
120- queryEnv ,dest ,completionTag );
121- else
122- standard_ProcessUtility (pstmt ,queryString ,
123- context ,params ,queryEnv ,
124- dest ,completionTag );
125-
126- if (conn )
127- {
128- PGresult * result ;
129-
130- while ((result = PQgetResult (conn ))!= NULL )
131- Assert (PQresultStatus (result )!= PGRES_FATAL_ERROR );
132- PQfinish (conn );
133- conn = NULL ;
134- }
135- }
136-
13722/*
13823 * INPUT: a base64-encoded serialized plan
13924 */
140- static void
141- execute_query (char * planString )
25+ void
26+ execute_query (PGconn * dest , QueryDesc * queryDesc , int eflags )
14227{
143- char conninfo [1024 ];
14428char * SQLCommand ;
14529int status ;
30+ char * serializedPlan ;
31+ PGresult * result ;
14632
147- /* Connect to slave and send it a query plan */
148- sprintf (conninfo ,"host=localhost port=5433%c" ,'\0' );
149- conn = PQconnectdb (conninfo );
150- if (PQstatus (conn )== CONNECTION_BAD )
151- elog (LOG ,"Connection error. conninfo: %s" ,conninfo );
152-
153- SQLCommand = (char * )palloc0 (strlen (planString )+ 100 );
154- sprintf (SQLCommand ,"SELECT pg_execute_plan('%s');" ,planString );
155- //elog(LOG, "query: %s", SQLCommand);
156- status = PQsendQuery (conn ,SQLCommand );
157- if (status == 0 )
158- elog (ERROR ,"Query sending error: %s" ,PQerrorMessage (conn ));
159- }
33+ Assert (dest != NULL );
16034
161- static PlannedStmt *
162- HOOK_Planner_injection (Query * parse ,int cursorOptions ,
163- ParamListInfo boundParams )
164- {
165- PlannedStmt * pstmt ;
166-
167- conn = NULL ;
168-
169- if (prev_planner_hook )
170- pstmt = prev_planner_hook (parse ,cursorOptions ,boundParams );
171- else
172- pstmt = standard_planner (parse ,cursorOptions ,boundParams );
173-
174- if ((node_number1 > 0 )|| (parse -> utilityStmt != NULL ))
175- return pstmt ;
176-
177- /* Extension is not initialized. */
178- if (OidIsValid (get_extension_oid ("execplan" , true)))
179- {
180-
181- }
182- return pstmt ;
183- }
184-
185- static void
186- HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags )
187- {
188- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
189-
190- if (prev_ExecutorStart )
191- prev_ExecutorStart (queryDesc ,eflags );
192- else
193- standard_ExecutorStart (queryDesc ,eflags );
194-
195- if ((OidIsValid (get_extension_oid ("execplan" , true)))&&
196- (node_number1 == 0 )&&
197- ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt )))
198- {
199- //elog(LOG, "Send query: %s", queryDesc->sourceText);
200- execute_query (serialize_plan (queryDesc ,eflags ));
201- }
202- }
35+ /*
36+ * Before send of plan we need to check connection state.
37+ * If previous query was failed, we get PGRES_FATAL_ERROR.
38+ */
39+ while ((result = PQgetResult (dest ))!= NULL );
20340
204- static void
205- HOOK_ExecEnd_injection (QueryDesc * queryDesc )
206- {
207- /* Execute before hook because it destruct memory context of exchange list */
208- if (conn )
209- {
210- PGresult * result ;
41+ serializedPlan = serialize_plan (queryDesc ,eflags );
42+ /* Connect to slave and send it a query plan */
43+ SQLCommand = (char * )palloc0 (strlen (serializedPlan )+ 100 );
44+ sprintf (SQLCommand ,"SELECT pg_execute_plan('%s');" ,serializedPlan );
21145
212- while ((result = PQgetResult (conn ))!= NULL )
213- Assert (PQresultStatus (result )!= PGRES_FATAL_ERROR );
214- PQfinish (conn );
215- conn = NULL ;
216- }
46+ status = PQsendQuery (dest ,SQLCommand );
21747
218- if (prev_ExecutorEnd )
219- prev_ExecutorEnd (queryDesc );
220- else
221- standard_ExecutorEnd (queryDesc );
48+ if (status == 0 )
49+ elog (ERROR ,"Query sending error: %s" ,PQerrorMessage (dest ));
22250}
22351
224- #include "common/base64.h"
225- #include "nodes/nodeFuncs.h"
226-
22752static bool
22853compute_irels_buffer_len (Plan * plan ,int * length )
22954{
@@ -258,8 +83,6 @@ compute_irels_buffer_len(Plan *plan, int *length)
25883return plan_tree_walker (plan ,compute_irels_buffer_len ,length );
25984}
26085
261- //#include "nodes/pg_list.h"
262-
26386static char *
26487serialize_plan (QueryDesc * queryDesc ,int eflags )
26588{
@@ -275,7 +98,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
27598* econtainer ,
27699* start_address ;
277100ListCell * lc ;
278-
101+ elog ( LOG , "Send QUERY: %s" , queryDesc -> sourceText );
279102serialized_plan = nodeToString (queryDesc -> plannedstmt );
280103
281104/*
@@ -302,11 +125,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
302125 * to save the relation names in serialized plan.
303126 */
304127compute_irels_buffer_len (queryDesc -> plannedstmt -> planTree ,& inames_len );
305- //plan_tree_walker(queryDesc->plannedstmt->planTree,
306- // compute_irels_buffer_len,
307- // &inames_len);
308- //planstate_tree_walker((PlanState *) (queryDesc->planstate), compute_irels_buffer_len, &inames_len);
309- //elog(LOG, "inames_len=%d", inames_len);
128+
310129/* We use len+1 bytes for include end-of-string symbol. */
311130splan_len = strlen (serialized_plan )+ 1 ;
312131qtext_len = strlen (queryDesc -> sourceText )+ 1 ;
@@ -353,9 +172,6 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
353172}
354173}
355174store_irel_name ((Plan * ) (queryDesc -> plannedstmt -> planTree ),start_address );
356- //plan_tree_walker((Plan *) (queryDesc->plannedstmt->planTree),
357- // store_irel_name,
358- // start_address);
359175
360176start_address += inames_len ;
361177Assert ((start_address - container )== tot_len );
@@ -525,7 +341,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
525341
526342/* Restore query source text string */
527343queryString = start_addr ;
528- // elog(LOG, "queryString : %s", queryString);
344+ elog (LOG ,"Recv QUERY : %s" ,queryString );
529345/* Restore instrument and flags */
530346start_addr += strlen (queryString )+ 1 ;
531347instrument_options = (int * )start_addr ;
@@ -544,7 +360,6 @@ pg_execute_plan(PG_FUNCTION_ARGS)
544360{
545361rte -> relid = RelnameGetRelid (start_addr );
546362Assert (rte -> relid != InvalidOid );
547- //elog(LOG, "Relation from decoded plan. relid=%d relname=%s", rte->relid, start_addr);
548363start_addr += strlen (start_addr )+ 1 ;
549364}
550365}
@@ -565,7 +380,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
565380ExecutorFinish (queryDesc );
566381ExecutorEnd (queryDesc );
567382FreeQueryDesc (queryDesc );
568-
383+ //elog(LOG, "End of QUERY: %s", queryString);
569384pfree (decdata );
570385PG_RETURN_BOOL (true);
571386}