Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd2d5401

Browse files
committed
raw
1 parent8c21867 commitd2d5401

File tree

1 file changed

+63
-72
lines changed

1 file changed

+63
-72
lines changed

‎repeater.c

Lines changed: 63 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1-
/*
1+
/*-------------------------------------------------------------------------
2+
*
23
* repeater.c
4+
* Simple demo for remote plan execution patch.
35
*
6+
* Transfer query plan to a remote instance and wait for result.
7+
* Remote instance parameters (host, port) defines by GUCs.
8+
*
9+
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10+
* Portions Copyright (c) 2018-2019, Postgres Professional
11+
*-------------------------------------------------------------------------
412
*/
513

614
#include"postgres.h"
@@ -11,12 +19,9 @@
1119
#include"fmgr.h"
1220
#include"libpq/libpq.h"
1321
#include"libpq-fe.h"
14-
#include"nodes/params.h"
1522
#include"optimizer/planner.h"
1623
#include"tcop/utility.h"
1724
#include"utils/guc.h"
18-
#include"utils/memutils.h"
19-
#include"utils/plancache.h"
2025

2126
PG_MODULE_MAGIC;
2227

@@ -32,12 +37,14 @@ static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
3237
char*completionTag);
3338
staticvoidHOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags);
3439
staticvoidHOOK_ExecEnd_injection(QueryDesc*queryDesc);
35-
staticintexecute_query(PGconn*dest,QueryDesc*queryDesc,inteflags);
36-
3740

41+
/* Remote instance parameters. */
3842
char*repeater_host_name;
3943
intrepeater_port_number;
4044

45+
staticboolExtensionIsActivated= false;
46+
staticPGconn*conn=NULL;
47+
4148
/*
4249
* Module load/unload callback
4350
*/
@@ -79,8 +86,6 @@ _PG_init(void)
7986
ExecutorEnd_hook=HOOK_ExecEnd_injection;
8087
}
8188

82-
staticPGconn*conn=NULL;
83-
8489
staticPGconn*
8590
EstablishConnection(void)
8691
{
@@ -90,17 +95,17 @@ EstablishConnection(void)
9095
returnconn;
9196

9297
/* Connect to slave and send it a query plan */
93-
sprintf(conninfo,"host=localhost port=5433%c",'\0');
98+
sprintf(conninfo,"host=%s port=%d %c",repeater_host_name,repeater_port_number,'\0');
9499
conn=PQconnectdb(conninfo);
95100

96101
if (PQstatus(conn)==CONNECTION_BAD)
97102
elog(LOG,"Connection error. conninfo: %s",conninfo);
103+
else
104+
elog(LOG,"Connection established: host=%s, port=%d",repeater_host_name,repeater_port_number);
98105

99106
returnconn;
100107
}
101108

102-
staticboolExtensionIsActivated= false;
103-
104109
staticbool
105110
ExtensionIsActive(void)
106111
{
@@ -117,6 +122,10 @@ ExtensionIsActive(void)
117122
returnExtensionIsActivated;
118123
}
119124

125+
/*
126+
* We need to send some DML queries for sync database schema to a plan execution
127+
* at a remote instance.
128+
*/
120129
staticvoid
121130
HOOK_Utility_injection(PlannedStmt*pstmt,
122131
constchar*queryString,
@@ -126,27 +135,35 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
126135
DestReceiver*dest,
127136
char*completionTag)
128137
{
129-
Node*parsetree=pstmt->utilityStmt;
138+
Node*parsetree=pstmt->utilityStmt;
139+
PGresult*result;
130140

141+
/*
142+
* Very non-trivial decision about transferring utility query to data nodes.
143+
* This exception list used for demonstration and let us to execute some
144+
* simple queries.
145+
*/
131146
if (ExtensionIsActive()&&
147+
pstmt->canSetTag&&
132148
(nodeTag(parsetree)!=T_CopyStmt)&&
133149
(nodeTag(parsetree)!=T_CreateExtensionStmt)&&
134150
(nodeTag(parsetree)!=T_ExplainStmt)&&
151+
(nodeTag(parsetree)!=T_FetchStmt)&&
135152
(context!=PROCESS_UTILITY_SUBCOMMAND)
136153
)
137154
{
138-
PGresult*result;
139-
155+
/*
156+
* Previous query could be completed with error report at this instance.
157+
* In this case, we need to prepare connection to the remote instance.
158+
*/
140159
while ((result=PQgetResult(EstablishConnection()))!=NULL);
141160

142161
if (PQsendQuery(EstablishConnection(),queryString)==0)
143-
{
144-
elog(ERROR,"Sending UTILITY query error: %s",queryString);
145-
PQreset(conn);
146-
}
162+
elog(ERROR,"Connection error: query: %s, status=%d, errmsg=%s",
163+
queryString,
164+
PQstatus(EstablishConnection()),
165+
PQerrorMessage(EstablishConnection()));
147166
}
148-
else
149-
elog(LOG,"UTILITY query without sending: %s",queryString);
150167

151168
if (next_ProcessUtility_hook)
152169
(*next_ProcessUtility_hook) (pstmt,queryString,context,params,
@@ -156,48 +173,51 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
156173
context,params,queryEnv,
157174
dest,completionTag);
158175

176+
/*
177+
* Check end of query execution at the remote instance.
178+
*/
159179
if (conn)
160-
{
161-
PGresult*result;
162-
163180
while ((result=PQgetResult(conn))!=NULL);
164-
}
165181
}
166-
staticintIsExecuted=0;
167182

168183
staticvoid
169184
HOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags)
170185
{
171-
Node*parsetree=queryDesc->plannedstmt->utilityStmt;
186+
Node*parsetree=queryDesc->plannedstmt->utilityStmt;
187+
PGresult*result;
188+
PGconn*dest=EstablishConnection();
172189

173190
if (prev_ExecutorStart)
174191
prev_ExecutorStart(queryDesc,eflags);
175192
else
176193
standard_ExecutorStart(queryDesc,eflags);
177-
178-
IsExecuted++;
179-
180-
if (IsExecuted>1)
181-
return;
182-
183-
if (
184-
ExtensionIsActive()&&
185-
(repeater_host_name==0)&&
186-
((parsetree==NULL)|| (nodeTag(parsetree)!=T_CreatedbStmt))&&
187-
!(eflags&EXEC_FLAG_EXPLAIN_ONLY)
188-
)
194+
elog(LOG,"QUERY: %s",queryDesc->sourceText);
195+
/*
196+
* This not fully correct sign for prevent passing each subquery to
197+
* the remote instance. Only for demo.
198+
*/
199+
if (ExtensionIsActive()&&
200+
queryDesc->plannedstmt->canSetTag&&
201+
((parsetree==NULL)|| (nodeTag(parsetree)!=T_CreatedbStmt))&&
202+
!(eflags&EXEC_FLAG_EXPLAIN_ONLY))
189203
{
190-
elog(LOG,"Send query: %s",queryDesc->sourceText);
191-
if (execute_query(EstablishConnection(),queryDesc,eflags)==0)
192-
PQreset(conn);
204+
/*
205+
* Prepare connection.
206+
*/
207+
while ((result=PQgetResult(dest))!=NULL);
208+
elog(LOG,"->QUERY: %s",queryDesc->sourceText);
209+
if (PQsendPlan(dest,serialize_plan(queryDesc,eflags))==0)
210+
/*
211+
* Report about remote execution error.
212+
*/
213+
elog(ERROR,"Connection errors during PLAN transferring: status=%d, errmsg=%s",
214+
PQstatus(dest),PQerrorMessage(dest));
193215
}
194216
}
195217

196218
staticvoid
197219
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
198220
{
199-
IsExecuted--;
200-
/* Execute before hook because it destruct memory context of exchange list */
201221
if (conn)
202222
{
203223
PGresult*result;
@@ -210,32 +230,3 @@ HOOK_ExecEnd_injection(QueryDesc *queryDesc)
210230
else
211231
standard_ExecutorEnd(queryDesc);
212232
}
213-
214-
215-
/*
216-
* Serialize plan and send it to the destination instance
217-
*/
218-
staticint
219-
execute_query(PGconn*dest,QueryDesc*queryDesc,inteflags)
220-
{
221-
PGresult*result;
222-
223-
Assert(dest!=NULL);
224-
225-
/*
226-
* Before send of plan we need to check connection state.
227-
* If previous query was failed, we get PGRES_FATAL_ERROR.
228-
*/
229-
while ((result=PQgetResult(dest))!=NULL);
230-
231-
if (PQsendPlan(dest,serialize_plan(queryDesc,eflags))==0)
232-
{
233-
/*
234-
* Report about remote execution error and return control to caller.
235-
*/
236-
elog(ERROR,"PLAN sending error.");
237-
return0;
238-
}
239-
240-
return1;
241-
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp