1- /*
1+ /*-------------------------------------------------------------------------
2+ *
23 * repeater.c
4+ * Simple demo for remote plan execution patch.
35 *
6+ * Transfer query plan to a remote instance and wait for result.
7+ * Remote instance parameters (host, port) defines by GUCs.
8+ *
9+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10+ * Portions Copyright (c) 2018-2019, Postgres Professional
11+ *-------------------------------------------------------------------------
412 */
513
614#include "postgres.h"
1119#include "fmgr.h"
1220#include "libpq/libpq.h"
1321#include "libpq-fe.h"
14- #include "nodes/params.h"
1522#include "optimizer/planner.h"
1623#include "tcop/utility.h"
1724#include "utils/guc.h"
18- #include "utils/memutils.h"
19- #include "utils/plancache.h"
2025
2126PG_MODULE_MAGIC ;
2227
@@ -32,12 +37,14 @@ static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
3237char * completionTag );
3338static void HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags );
3439static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
35- static int execute_query (PGconn * dest ,QueryDesc * queryDesc ,int eflags );
36-
3740
41+ /* Remote instance parameters. */
3842char * repeater_host_name ;
3943int repeater_port_number ;
4044
45+ static bool ExtensionIsActivated = false;
46+ static PGconn * conn = NULL ;
47+
4148/*
4249 * Module load/unload callback
4350 */
@@ -79,8 +86,6 @@ _PG_init(void)
7986ExecutorEnd_hook = HOOK_ExecEnd_injection ;
8087}
8188
82- static PGconn * conn = NULL ;
83-
8489static PGconn *
8590EstablishConnection (void )
8691{
@@ -90,17 +95,17 @@ EstablishConnection(void)
9095return conn ;
9196
9297/* Connect to slave and send it a query plan */
93- sprintf (conninfo ,"host=localhost port=5433%c" ,'\0' );
98+ sprintf (conninfo ,"host=%s port=%d %c" , repeater_host_name , repeater_port_number ,'\0' );
9499conn = PQconnectdb (conninfo );
95100
96101if (PQstatus (conn )== CONNECTION_BAD )
97102elog (LOG ,"Connection error. conninfo: %s" ,conninfo );
103+ else
104+ elog (LOG ,"Connection established: host=%s, port=%d" ,repeater_host_name ,repeater_port_number );
98105
99106return conn ;
100107}
101108
102- static bool ExtensionIsActivated = false;
103-
104109static bool
105110ExtensionIsActive (void )
106111{
@@ -117,6 +122,10 @@ ExtensionIsActive(void)
117122return ExtensionIsActivated ;
118123}
119124
125+ /*
126+ * We need to send some DML queries for sync database schema to a plan execution
127+ * at a remote instance.
128+ */
120129static void
121130HOOK_Utility_injection (PlannedStmt * pstmt ,
122131const char * queryString ,
@@ -126,27 +135,35 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
126135DestReceiver * dest ,
127136char * completionTag )
128137{
129- Node * parsetree = pstmt -> utilityStmt ;
138+ Node * parsetree = pstmt -> utilityStmt ;
139+ PGresult * result ;
130140
141+ /*
142+ * Very non-trivial decision about transferring utility query to data nodes.
143+ * This exception list used for demonstration and let us to execute some
144+ * simple queries.
145+ */
131146if (ExtensionIsActive ()&&
147+ pstmt -> canSetTag &&
132148(nodeTag (parsetree )!= T_CopyStmt )&&
133149(nodeTag (parsetree )!= T_CreateExtensionStmt )&&
134150(nodeTag (parsetree )!= T_ExplainStmt )&&
151+ (nodeTag (parsetree )!= T_FetchStmt )&&
135152(context != PROCESS_UTILITY_SUBCOMMAND )
136153 )
137154{
138- PGresult * result ;
139-
155+ /*
156+ * Previous query could be completed with error report at this instance.
157+ * In this case, we need to prepare connection to the remote instance.
158+ */
140159while ((result = PQgetResult (EstablishConnection ()))!= NULL );
141160
142161if (PQsendQuery (EstablishConnection (),queryString )== 0 )
143- {
144- elog ( ERROR , "Sending UTILITY query error: %s" , queryString );
145- PQreset ( conn );
146- }
162+ elog ( ERROR , "Connection error: query: %s, status=%d, errmsg=%s" ,
163+ queryString ,
164+ PQstatus ( EstablishConnection ()),
165+ PQerrorMessage ( EstablishConnection ()));
147166}
148- else
149- elog (LOG ,"UTILITY query without sending: %s" ,queryString );
150167
151168if (next_ProcessUtility_hook )
152169(* next_ProcessUtility_hook ) (pstmt ,queryString ,context ,params ,
@@ -156,48 +173,51 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
156173context ,params ,queryEnv ,
157174dest ,completionTag );
158175
176+ /*
177+ * Check end of query execution at the remote instance.
178+ */
159179if (conn )
160- {
161- PGresult * result ;
162-
163180while ((result = PQgetResult (conn ))!= NULL );
164- }
165181}
166- static int IsExecuted = 0 ;
167182
168183static void
169184HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags )
170185{
171- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
186+ Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
187+ PGresult * result ;
188+ PGconn * dest = EstablishConnection ();
172189
173190if (prev_ExecutorStart )
174191prev_ExecutorStart (queryDesc ,eflags );
175192else
176193standard_ExecutorStart (queryDesc ,eflags );
177-
178- IsExecuted ++ ;
179-
180- if (IsExecuted > 1 )
181- return ;
182-
183- if (
184- ExtensionIsActive ()&&
185- (repeater_host_name == 0 )&&
186- ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt ))&&
187- !(eflags & EXEC_FLAG_EXPLAIN_ONLY )
188- )
194+ elog (LOG ,"QUERY: %s" ,queryDesc -> sourceText );
195+ /*
196+ * This not fully correct sign for prevent passing each subquery to
197+ * the remote instance. Only for demo.
198+ */
199+ if (ExtensionIsActive ()&&
200+ queryDesc -> plannedstmt -> canSetTag &&
201+ ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt ))&&
202+ !(eflags & EXEC_FLAG_EXPLAIN_ONLY ))
189203{
190- elog (LOG ,"Send query: %s" ,queryDesc -> sourceText );
191- if (execute_query (EstablishConnection (),queryDesc ,eflags )== 0 )
192- PQreset (conn );
204+ /*
205+ * Prepare connection.
206+ */
207+ while ((result = PQgetResult (dest ))!= NULL );
208+ elog (LOG ,"->QUERY: %s" ,queryDesc -> sourceText );
209+ if (PQsendPlan (dest ,serialize_plan (queryDesc ,eflags ))== 0 )
210+ /*
211+ * Report about remote execution error.
212+ */
213+ elog (ERROR ,"Connection errors during PLAN transferring: status=%d, errmsg=%s" ,
214+ PQstatus (dest ),PQerrorMessage (dest ));
193215}
194216}
195217
196218static void
197219HOOK_ExecEnd_injection (QueryDesc * queryDesc )
198220{
199- IsExecuted -- ;
200- /* Execute before hook because it destruct memory context of exchange list */
201221if (conn )
202222{
203223PGresult * result ;
@@ -210,32 +230,3 @@ HOOK_ExecEnd_injection(QueryDesc *queryDesc)
210230else
211231standard_ExecutorEnd (queryDesc );
212232}
213-
214-
215- /*
216- * Serialize plan and send it to the destination instance
217- */
218- static int
219- execute_query (PGconn * dest ,QueryDesc * queryDesc ,int eflags )
220- {
221- PGresult * result ;
222-
223- Assert (dest != NULL );
224-
225- /*
226- * Before send of plan we need to check connection state.
227- * If previous query was failed, we get PGRES_FATAL_ERROR.
228- */
229- while ((result = PQgetResult (dest ))!= NULL );
230-
231- if (PQsendPlan (dest ,serialize_plan (queryDesc ,eflags ))== 0 )
232- {
233- /*
234- * Report about remote execution error and return control to caller.
235- */
236- elog (ERROR ,"PLAN sending error." );
237- return 0 ;
238- }
239-
240- return 1 ;
241- }