1
- /*
1
+ /*-------------------------------------------------------------------------
2
+ *
2
3
* repeater.c
4
+ * Simple demo for remote plan execution patch.
3
5
*
6
+ * Transfer query plan to a remote instance and wait for result.
7
+ * Remote instance parameters (host, port) defines by GUCs.
8
+ *
9
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10
+ * Portions Copyright (c) 2018-2019, Postgres Professional
11
+ *-------------------------------------------------------------------------
4
12
*/
5
13
6
14
#include "postgres.h"
11
19
#include "fmgr.h"
12
20
#include "libpq/libpq.h"
13
21
#include "libpq-fe.h"
14
- #include "nodes/params.h"
15
22
#include "optimizer/planner.h"
16
23
#include "tcop/utility.h"
17
24
#include "utils/guc.h"
18
- #include "utils/memutils.h"
19
- #include "utils/plancache.h"
20
25
21
26
PG_MODULE_MAGIC ;
22
27
@@ -32,12 +37,14 @@ static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
32
37
char * completionTag );
33
38
static void HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags );
34
39
static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
35
- static int execute_query (PGconn * dest ,QueryDesc * queryDesc ,int eflags );
36
-
37
40
41
+ /* Remote instance parameters. */
38
42
char * repeater_host_name ;
39
43
int repeater_port_number ;
40
44
45
+ static bool ExtensionIsActivated = false;
46
+ static PGconn * conn = NULL ;
47
+
41
48
/*
42
49
* Module load/unload callback
43
50
*/
@@ -79,8 +86,6 @@ _PG_init(void)
79
86
ExecutorEnd_hook = HOOK_ExecEnd_injection ;
80
87
}
81
88
82
- static PGconn * conn = NULL ;
83
-
84
89
static PGconn *
85
90
EstablishConnection (void )
86
91
{
@@ -90,17 +95,17 @@ EstablishConnection(void)
90
95
return conn ;
91
96
92
97
/* Connect to slave and send it a query plan */
93
- sprintf (conninfo ,"host=localhost port=5433%c" ,'\0' );
98
+ sprintf (conninfo ,"host=%s port=%d %c" , repeater_host_name , repeater_port_number ,'\0' );
94
99
conn = PQconnectdb (conninfo );
95
100
96
101
if (PQstatus (conn )== CONNECTION_BAD )
97
102
elog (LOG ,"Connection error. conninfo: %s" ,conninfo );
103
+ else
104
+ elog (LOG ,"Connection established: host=%s, port=%d" ,repeater_host_name ,repeater_port_number );
98
105
99
106
return conn ;
100
107
}
101
108
102
- static bool ExtensionIsActivated = false;
103
-
104
109
static bool
105
110
ExtensionIsActive (void )
106
111
{
@@ -117,6 +122,10 @@ ExtensionIsActive(void)
117
122
return ExtensionIsActivated ;
118
123
}
119
124
125
+ /*
126
+ * We need to send some DML queries for sync database schema to a plan execution
127
+ * at a remote instance.
128
+ */
120
129
static void
121
130
HOOK_Utility_injection (PlannedStmt * pstmt ,
122
131
const char * queryString ,
@@ -126,27 +135,35 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
126
135
DestReceiver * dest ,
127
136
char * completionTag )
128
137
{
129
- Node * parsetree = pstmt -> utilityStmt ;
138
+ Node * parsetree = pstmt -> utilityStmt ;
139
+ PGresult * result ;
130
140
141
+ /*
142
+ * Very non-trivial decision about transferring utility query to data nodes.
143
+ * This exception list used for demonstration and let us to execute some
144
+ * simple queries.
145
+ */
131
146
if (ExtensionIsActive ()&&
147
+ pstmt -> canSetTag &&
132
148
(nodeTag (parsetree )!= T_CopyStmt )&&
133
149
(nodeTag (parsetree )!= T_CreateExtensionStmt )&&
134
150
(nodeTag (parsetree )!= T_ExplainStmt )&&
151
+ (nodeTag (parsetree )!= T_FetchStmt )&&
135
152
(context != PROCESS_UTILITY_SUBCOMMAND )
136
153
)
137
154
{
138
- PGresult * result ;
139
-
155
+ /*
156
+ * Previous query could be completed with error report at this instance.
157
+ * In this case, we need to prepare connection to the remote instance.
158
+ */
140
159
while ((result = PQgetResult (EstablishConnection ()))!= NULL );
141
160
142
161
if (PQsendQuery (EstablishConnection (),queryString )== 0 )
143
- {
144
- elog ( ERROR , "Sending UTILITY query error: %s" , queryString );
145
- PQreset ( conn );
146
- }
162
+ elog ( ERROR , "Connection error: query: %s, status=%d, errmsg=%s" ,
163
+ queryString ,
164
+ PQstatus ( EstablishConnection ()),
165
+ PQerrorMessage ( EstablishConnection ()));
147
166
}
148
- else
149
- elog (LOG ,"UTILITY query without sending: %s" ,queryString );
150
167
151
168
if (next_ProcessUtility_hook )
152
169
(* next_ProcessUtility_hook ) (pstmt ,queryString ,context ,params ,
@@ -156,48 +173,51 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
156
173
context ,params ,queryEnv ,
157
174
dest ,completionTag );
158
175
176
+ /*
177
+ * Check end of query execution at the remote instance.
178
+ */
159
179
if (conn )
160
- {
161
- PGresult * result ;
162
-
163
180
while ((result = PQgetResult (conn ))!= NULL );
164
- }
165
181
}
166
- static int IsExecuted = 0 ;
167
182
168
183
static void
169
184
HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags )
170
185
{
171
- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
186
+ Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
187
+ PGresult * result ;
188
+ PGconn * dest = EstablishConnection ();
172
189
173
190
if (prev_ExecutorStart )
174
191
prev_ExecutorStart (queryDesc ,eflags );
175
192
else
176
193
standard_ExecutorStart (queryDesc ,eflags );
177
-
178
- IsExecuted ++ ;
179
-
180
- if (IsExecuted > 1 )
181
- return ;
182
-
183
- if (
184
- ExtensionIsActive ()&&
185
- (repeater_host_name == 0 )&&
186
- ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt ))&&
187
- !(eflags & EXEC_FLAG_EXPLAIN_ONLY )
188
- )
194
+ elog (LOG ,"QUERY: %s" ,queryDesc -> sourceText );
195
+ /*
196
+ * This not fully correct sign for prevent passing each subquery to
197
+ * the remote instance. Only for demo.
198
+ */
199
+ if (ExtensionIsActive ()&&
200
+ queryDesc -> plannedstmt -> canSetTag &&
201
+ ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt ))&&
202
+ !(eflags & EXEC_FLAG_EXPLAIN_ONLY ))
189
203
{
190
- elog (LOG ,"Send query: %s" ,queryDesc -> sourceText );
191
- if (execute_query (EstablishConnection (),queryDesc ,eflags )== 0 )
192
- PQreset (conn );
204
+ /*
205
+ * Prepare connection.
206
+ */
207
+ while ((result = PQgetResult (dest ))!= NULL );
208
+ elog (LOG ,"->QUERY: %s" ,queryDesc -> sourceText );
209
+ if (PQsendPlan (dest ,serialize_plan (queryDesc ,eflags ))== 0 )
210
+ /*
211
+ * Report about remote execution error.
212
+ */
213
+ elog (ERROR ,"Connection errors during PLAN transferring: status=%d, errmsg=%s" ,
214
+ PQstatus (dest ),PQerrorMessage (dest ));
193
215
}
194
216
}
195
217
196
218
static void
197
219
HOOK_ExecEnd_injection (QueryDesc * queryDesc )
198
220
{
199
- IsExecuted -- ;
200
- /* Execute before hook because it destruct memory context of exchange list */
201
221
if (conn )
202
222
{
203
223
PGresult * result ;
@@ -210,32 +230,3 @@ HOOK_ExecEnd_injection(QueryDesc *queryDesc)
210
230
else
211
231
standard_ExecutorEnd (queryDesc );
212
232
}
213
-
214
-
215
- /*
216
- * Serialize plan and send it to the destination instance
217
- */
218
- static int
219
- execute_query (PGconn * dest ,QueryDesc * queryDesc ,int eflags )
220
- {
221
- PGresult * result ;
222
-
223
- Assert (dest != NULL );
224
-
225
- /*
226
- * Before send of plan we need to check connection state.
227
- * If previous query was failed, we get PGRES_FATAL_ERROR.
228
- */
229
- while ((result = PQgetResult (dest ))!= NULL );
230
-
231
- if (PQsendPlan (dest ,serialize_plan (queryDesc ,eflags ))== 0 )
232
- {
233
- /*
234
- * Report about remote execution error and return control to caller.
235
- */
236
- elog (ERROR ,"PLAN sending error." );
237
- return 0 ;
238
- }
239
-
240
- return 1 ;
241
- }