1
1
/*-------------------------------------------------------------------------
2
2
*
3
3
* repeater.c
4
- * Simple demo for remote plan execution patch.
4
+ *Simple demo for remote plan execution patch.
5
5
*
6
6
* Transfer query plan to a remote instance and wait for result.
7
7
* Remote instance parameters (host, port) defines by GUCs.
34
34
35
35
PG_MODULE_MAGIC ;
36
36
37
- void _PG_init (void );
37
+ void _PG_init (void );
38
38
39
- static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
40
- static ExecutorStart_hook_type prev_ExecutorStart = NULL ;
41
- static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
39
+ static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
40
+ static ExecutorStart_hook_type prev_ExecutorStart = NULL ;
41
+ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
42
42
43
43
static void HOOK_Utility_injection (PlannedStmt * pstmt ,const char * queryString ,
44
- ProcessUtilityContext context ,ParamListInfo params ,
45
- DestReceiver * dest ,char * completionTag );
44
+ ProcessUtilityContext context ,ParamListInfo params ,
45
+ DestReceiver * dest ,char * completionTag );
46
46
static void HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags );
47
47
static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
48
48
49
49
/* Remote instance parameters. */
50
- char * remote_server_fdwname ;
50
+ char * remote_server_fdwname ;
51
51
52
- static bool ExtensionIsActivated = false;
53
- static PGconn * conn = NULL ;
52
+ static bool ExtensionIsActivated = false;
53
+ static PGconn * conn = NULL ;
54
54
55
- static Oid serverid = InvalidOid ;
56
- static UserMapping * user = NULL ;
55
+ static Oid serverid = InvalidOid ;
56
+ static UserMapping * user = NULL ;
57
57
58
58
59
59
/*
63
63
_PG_init (void )
64
64
{
65
65
DefineCustomStringVariable ("repeater.fdwname" ,
66
- "Remote host fdw name" ,
67
- NULL ,
68
- & remote_server_fdwname ,
69
- "remoteserv" ,
70
- PGC_SIGHUP ,
71
- GUC_NOT_IN_SAMPLE ,
72
- NULL ,
73
- NULL ,
74
- NULL );
66
+ "Remote host fdw name" ,
67
+ NULL ,
68
+ & remote_server_fdwname ,
69
+ "remoteserv" ,
70
+ PGC_SIGHUP ,
71
+ GUC_NOT_IN_SAMPLE ,
72
+ NULL ,
73
+ NULL ,
74
+ NULL );
75
75
76
76
/* ProcessUtility hook */
77
77
next_ProcessUtility_hook = ProcessUtility_hook ;
@@ -106,22 +106,22 @@ ExtensionIsActive(void)
106
106
*/
107
107
static void
108
108
HOOK_Utility_injection (PlannedStmt * pstmt ,
109
- const char * queryString ,
110
- ProcessUtilityContext context ,
111
- ParamListInfo params ,
112
- DestReceiver * dest ,
113
- char * completionTag )
109
+ const char * queryString ,
110
+ ProcessUtilityContext context ,
111
+ ParamListInfo params ,
112
+ DestReceiver * dest ,
113
+ char * completionTag )
114
114
{
115
- Node * parsetree = pstmt -> utilityStmt ;
115
+ Node * parsetree = pstmt -> utilityStmt ;
116
116
117
117
if (ExtensionIsActive ()&&
118
118
pstmt -> canSetTag &&
119
119
(context != PROCESS_UTILITY_SUBCOMMAND )
120
- )
120
+ )
121
121
{
122
122
if (!user )
123
123
{
124
- MemoryContext oldCxt = MemoryContextSwitchTo (TopMemoryContext );
124
+ MemoryContext oldCxt = MemoryContextSwitchTo (TopMemoryContext );
125
125
126
126
serverid = get_foreign_server_oid (remote_server_fdwname , true);
127
127
Assert (OidIsValid (serverid ));
@@ -131,32 +131,33 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
131
131
}
132
132
switch (nodeTag (parsetree ))
133
133
{
134
- case T_CopyStmt :
135
- case T_CreateExtensionStmt :
136
- case T_ExplainStmt :
137
- case T_FetchStmt :
138
- case T_VacuumStmt :
139
- break ;
140
- default :
141
- {
142
- PGresult * res ;
143
- if (nodeTag (parsetree )== T_TransactionStmt )
144
- {
145
- TransactionStmt * stmt = (TransactionStmt * )parsetree ;
146
-
147
- if (
148
- //(stmt->kind != TRANS_STMT_ROLLBACK_TO) &&
149
- (stmt -> kind != TRANS_STMT_SAVEPOINT )
150
- )
151
- break ;
152
- }
153
- conn = GetConnection (user , true);
154
- Assert (conn != NULL );
155
-
156
- res = PQexec (conn ,queryString );
157
- PQclear (res );
158
- }
159
- break ;
134
+ case T_CopyStmt :
135
+ case T_CreateExtensionStmt :
136
+ case T_ExplainStmt :
137
+ case T_FetchStmt :
138
+ case T_VacuumStmt :
139
+ break ;
140
+ default :
141
+ {
142
+ PGresult * res ;
143
+
144
+ if (nodeTag (parsetree )== T_TransactionStmt )
145
+ {
146
+ TransactionStmt * stmt = (TransactionStmt * )parsetree ;
147
+
148
+ if (
149
+ /*(stmt->kind != TRANS_STMT_ROLLBACK_TO) && */
150
+ (stmt -> kind != TRANS_STMT_SAVEPOINT )
151
+ )
152
+ break ;
153
+ }
154
+ conn = GetConnection (user , true);
155
+ Assert (conn != NULL );
156
+
157
+ res = PQexec (conn ,queryString );
158
+ PQclear (res );
159
+ }
160
+ break ;
160
161
}
161
162
}
162
163
@@ -165,68 +166,70 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
165
166
dest ,completionTag );
166
167
else
167
168
standard_ProcessUtility (pstmt ,queryString ,
168
- context ,params ,
169
- dest ,completionTag );
169
+ context ,params ,
170
+ dest ,completionTag );
170
171
}
171
172
172
173
static void
173
174
HOOK_ExecStart_injection (QueryDesc * queryDesc ,int eflags )
174
175
{
175
- Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
176
+ Node * parsetree = queryDesc -> plannedstmt -> utilityStmt ;
176
177
177
178
if (prev_ExecutorStart )
178
179
prev_ExecutorStart (queryDesc ,eflags );
179
180
else
180
181
standard_ExecutorStart (queryDesc ,eflags );
181
182
182
183
/*
183
- * This not fully correct sign for prevent passing each subquery to
184
- *the remote instance. Only for demo.
184
+ * This not fully correct sign for prevent passing each subquery to the
185
+ * remote instance. Only for demo.
185
186
*/
186
- if (ExtensionIsActive ()&&
187
- queryDesc -> plannedstmt -> canSetTag &&
188
- !IsParallelWorker ()&&
189
- ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt ))&&
190
- !(eflags & EXEC_FLAG_EXPLAIN_ONLY ))
191
- {
192
- Oid serverid ;
193
- UserMapping * user ;
194
- char * query ,
195
- * query_container ,
196
- * plan ,
197
- * plan_container ;
198
- int qlen ,qlen1 ,
199
- plen ,plen1 ;
200
- PGresult * res ;
201
-
202
- serverid = get_foreign_server_oid (remote_server_fdwname , true);
203
- Assert (OidIsValid (serverid ));
204
-
205
- user = GetUserMapping (GetUserId (),serverid );
206
- conn = GetConnection (user , true);
207
-
208
- set_portable_output (true);
209
- plan = nodeToString (queryDesc -> plannedstmt );
210
- set_portable_output (false);
211
- plen = b64_enc_len (plan ,strlen (plan )+ 1 );
212
- plan_container = (char * )palloc0 (plen + 1 );
213
- plen1 = b64_encode (plan ,strlen (plan ),plan_container );
214
- Assert (plen > plen1 );
215
-
216
- qlen = b64_enc_len (queryDesc -> sourceText ,strlen (queryDesc -> sourceText )+ 1 );
217
- query_container = (char * )palloc0 (qlen + 1 );
218
- qlen1 = b64_encode (queryDesc -> sourceText ,strlen (queryDesc -> sourceText ),query_container );
219
- Assert (qlen > qlen1 );
220
-
221
- query = palloc0 (qlen + plen + 100 );
222
- sprintf (query ,"SELECT public.pg_exec_plan('%s', '%s');" ,query_container ,plan_container );
223
-
224
- res = PQexec (conn ,query );
225
- PQclear (res );
226
- pfree (query );
227
- pfree (query_container );
228
- pfree (plan_container );
229
- }
187
+ if (ExtensionIsActive ()&&
188
+ queryDesc -> plannedstmt -> canSetTag &&
189
+ !IsParallelWorker ()&&
190
+ ((parsetree == NULL )|| (nodeTag (parsetree )!= T_CreatedbStmt ))&&
191
+ !(eflags & EXEC_FLAG_EXPLAIN_ONLY ))
192
+ {
193
+ Oid serverid ;
194
+ UserMapping * user ;
195
+ char * query ,
196
+ * query_container ,
197
+ * plan ,
198
+ * plan_container ;
199
+ int qlen ,
200
+ qlen1 ,
201
+ plen ,
202
+ plen1 ;
203
+ PGresult * res ;
204
+
205
+ serverid = get_foreign_server_oid (remote_server_fdwname , true);
206
+ Assert (OidIsValid (serverid ));
207
+
208
+ user = GetUserMapping (GetUserId (),serverid );
209
+ conn = GetConnection (user , true);
210
+
211
+ set_portable_output (true);
212
+ plan = nodeToString (queryDesc -> plannedstmt );
213
+ set_portable_output (false);
214
+ plen = b64_enc_len (plan ,strlen (plan )+ 1 );
215
+ plan_container = (char * )palloc0 (plen + 1 );
216
+ plen1 = b64_encode (plan ,strlen (plan ),plan_container );
217
+ Assert (plen > plen1 );
218
+
219
+ qlen = b64_enc_len (queryDesc -> sourceText ,strlen (queryDesc -> sourceText )+ 1 );
220
+ query_container = (char * )palloc0 (qlen + 1 );
221
+ qlen1 = b64_encode (queryDesc -> sourceText ,strlen (queryDesc -> sourceText ),query_container );
222
+ Assert (qlen > qlen1 );
223
+
224
+ query = palloc0 (qlen + plen + 100 );
225
+ sprintf (query ,"SELECT public.pg_exec_plan('%s', '%s');" ,query_container ,plan_container );
226
+
227
+ res = PQexec (conn ,query );
228
+ PQclear (res );
229
+ pfree (query );
230
+ pfree (query_container );
231
+ pfree (plan_container );
232
+ }
230
233
}
231
234
232
235
static void