Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0c867b9

Browse files
committed
Use Postgres-XL approach with postgres_fdw extension
1 parentd2d5401 commit0c867b9

File tree

7 files changed

+234
-118
lines changed

7 files changed

+234
-118
lines changed

‎Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ PGFILEDESC = "Repeater"
77
MODULES = repeater
88
OBJS = repeater.o$(WIN32RES)
99

10-
PG_CPPFLAGS = -I$(libpq_srcdir)
10+
fdw_srcdir =$(top_srcdir)/contrib/postgres_fdw/
11+
12+
PG_CPPFLAGS = -I$(libpq_srcdir) -I$(fdw_srcdir) -L$(fdw_srcdir)
1113
SHLIB_LINK_INTERNAL =$(libpq)
1214

1315
DATA_built =$(EXTENSION)--$(EXTVERSION).sql
@@ -17,10 +19,12 @@ PG_CONFIG = pg_config
1719
PGXS :=$(shell$(PG_CONFIG) --pgxs)
1820
include$(PGXS)
1921
else
22+
EXTRA_INSTALL = contrib/postgres_fdw
2023
SHLIB_PREREQS = submake-libpq
2124
subdir = contrib/repeater
2225
top_builddir = ../..
2326
include$(top_builddir)/src/Makefile.global
27+
#include $(top_builddir)/contrib/postgres_fdw/Makefile
2428
include$(top_srcdir)/contrib/contrib-global.mk
2529
endif
2630

‎repeater.c

Lines changed: 143 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
3939
staticvoidHOOK_ExecEnd_injection(QueryDesc*queryDesc);
4040

4141
/* Remote instance parameters. */
42-
char*repeater_host_name;
43-
intrepeater_port_number;
42+
char*remote_server_fdwname;
4443

4544
staticboolExtensionIsActivated= false;
4645
staticPGconn*conn=NULL;
@@ -51,24 +50,11 @@ static PGconn*conn = NULL;
5150
void
5251
_PG_init(void)
5352
{
54-
DefineCustomStringVariable("repeater.host",
55-
"Remote hostname for plan execution",
53+
DefineCustomStringVariable("repeater.fdwname",
54+
"Remote hostfdw name",
5655
NULL,
57-
&repeater_host_name,
58-
"localhost",
59-
PGC_SIGHUP,
60-
GUC_NOT_IN_SAMPLE,
61-
NULL,
62-
NULL,
63-
NULL);
64-
65-
DefineCustomIntVariable("repeater.port",
66-
"Port number of remote instance",
67-
NULL,
68-
&repeater_port_number,
69-
5432,
70-
1,
71-
65565,
56+
&remote_server_fdwname,
57+
"remoteserv",
7258
PGC_SIGHUP,
7359
GUC_NOT_IN_SAMPLE,
7460
NULL,
@@ -86,26 +72,6 @@ _PG_init(void)
8672
ExecutorEnd_hook=HOOK_ExecEnd_injection;
8773
}
8874

89-
staticPGconn*
90-
EstablishConnection(void)
91-
{
92-
charconninfo[1024];
93-
94-
if (conn!=NULL)
95-
returnconn;
96-
97-
/* Connect to slave and send it a query plan */
98-
sprintf(conninfo,"host=%s port=%d %c",repeater_host_name,repeater_port_number,'\0');
99-
conn=PQconnectdb(conninfo);
100-
101-
if (PQstatus(conn)==CONNECTION_BAD)
102-
elog(LOG,"Connection error. conninfo: %s",conninfo);
103-
else
104-
elog(LOG,"Connection established: host=%s, port=%d",repeater_host_name,repeater_port_number);
105-
106-
returnconn;
107-
}
108-
10975
staticbool
11076
ExtensionIsActive(void)
11177
{
@@ -122,6 +88,82 @@ ExtensionIsActive(void)
12288
returnExtensionIsActivated;
12389
}
12490

91+
#include"miscadmin.h"
92+
#include"pgstat.h"
93+
#include"storage/latch.h"
94+
95+
#include"foreign/foreign.h"
96+
#include"postgres_fdw.h"
97+
98+
staticOidserverid=InvalidOid;
99+
staticUserMapping*user=NULL;
100+
101+
staticbool
102+
pgfdw_cancel_query(PGconn*conn)
103+
{
104+
PGcancel*cancel;
105+
charerrbuf[256];
106+
PGresult*result=NULL;
107+
108+
if ((cancel=PQgetCancel(conn)))
109+
{
110+
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
111+
{
112+
printf("LAV: Cancel - ERROR\n");
113+
ereport(WARNING,
114+
(errcode(ERRCODE_CONNECTION_FAILURE),
115+
errmsg("could not send cancel request: %s",
116+
errbuf)));
117+
PQfreeCancel(cancel);
118+
return false;
119+
}
120+
else
121+
printf("LAV: Cancel - OK\n");
122+
123+
PQfreeCancel(cancel);
124+
}
125+
else
126+
printf("---ERROR---");
127+
128+
PQconsumeInput(conn);
129+
PQclear(result);
130+
131+
return true;
132+
}
133+
134+
staticvoid
135+
cancelQueryIfNeeded(PGconn*conn,constchar*query)
136+
{
137+
Assert(conn!=NULL);
138+
Assert(query!=NULL);
139+
140+
if (PQtransactionStatus(conn)!=PQTRANS_IDLE)
141+
{
142+
PGresult*res;
143+
144+
printf("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n",
145+
PQstatus(conn),
146+
PQtransactionStatus(conn),
147+
PQerrorMessage(conn));
148+
149+
res=PQgetResult(conn);
150+
//printf("status AFTER result request=%d, txs: %d errmsg: %s, resstatus: %s\n",
151+
//PQstatus(conn),
152+
//PQtransactionStatus(conn),
153+
//PQerrorMessage(conn),
154+
//PQresStatus(PQresultStatus(res)));
155+
if (PQresultStatus(res)==PGRES_FATAL_ERROR)
156+
//{
157+
Assert(pgfdw_cancel_query(conn));
158+
//printf("TRY to CANCEL query. status=%d, txs: %d errmsg: %s, resstatus: %s\n", PQstatus(conn), PQtransactionStatus(conn), PQerrorMessage(conn),
159+
//PQresStatus(PQresultStatus(res)));
160+
//}
161+
else
162+
pgfdw_get_result(conn,query);
163+
}
164+
165+
}
166+
125167
/*
126168
* We need to send some DML queries for sync database schema to a plan execution
127169
* at a remote instance.
@@ -136,33 +178,50 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
136178
char*completionTag)
137179
{
138180
Node*parsetree=pstmt->utilityStmt;
139-
PGresult*result;
140181

141-
/*
142-
* Very non-trivial decision about transferring utility query to data nodes.
143-
* This exception list used for demonstration and let us to execute some
144-
* simple queries.
145-
*/
146182
if (ExtensionIsActive()&&
147183
pstmt->canSetTag&&
148-
(nodeTag(parsetree)!=T_CopyStmt)&&
149-
(nodeTag(parsetree)!=T_CreateExtensionStmt)&&
150-
(nodeTag(parsetree)!=T_ExplainStmt)&&
151-
(nodeTag(parsetree)!=T_FetchStmt)&&
152184
(context!=PROCESS_UTILITY_SUBCOMMAND)
153185
)
154186
{
155-
/*
156-
* Previous query could be completed with error report at this instance.
157-
* In this case, we need to prepare connection to the remote instance.
158-
*/
159-
while ((result=PQgetResult(EstablishConnection()))!=NULL);
160-
161-
if (PQsendQuery(EstablishConnection(),queryString)==0)
162-
elog(ERROR,"Connection error: query: %s, status=%d, errmsg=%s",
163-
queryString,
164-
PQstatus(EstablishConnection()),
165-
PQerrorMessage(EstablishConnection()));
187+
if (!user)
188+
{
189+
MemoryContextoldCxt=MemoryContextSwitchTo(TopMemoryContext);
190+
191+
serverid=get_foreign_server_oid(remote_server_fdwname, true);
192+
Assert(OidIsValid(serverid));
193+
194+
user=GetUserMapping(GetUserId(),serverid);
195+
MemoryContextSwitchTo(oldCxt);
196+
}
197+
switch (nodeTag(parsetree))
198+
{
199+
caseT_CopyStmt:
200+
caseT_CreateExtensionStmt:
201+
caseT_ExplainStmt:
202+
caseT_FetchStmt:
203+
caseT_VacuumStmt:
204+
break;
205+
default:
206+
if (nodeTag(parsetree)==T_TransactionStmt)
207+
{
208+
TransactionStmt*stmt= (TransactionStmt*)parsetree;
209+
210+
if (
211+
//(stmt->kind != TRANS_STMT_ROLLBACK_TO) &&
212+
(stmt->kind!=TRANS_STMT_SAVEPOINT)
213+
)
214+
break;
215+
}
216+
if (conn)
217+
cancelQueryIfNeeded(conn,queryString);
218+
conn=GetConnection(user, true);
219+
cancelQueryIfNeeded(conn,queryString);
220+
Assert(conn!=NULL);
221+
222+
Assert(PQsendQuery(conn,queryString));
223+
break;
224+
};
166225
}
167226

168227
if (next_ProcessUtility_hook)
@@ -172,26 +231,22 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
172231
standard_ProcessUtility(pstmt,queryString,
173232
context,params,queryEnv,
174233
dest,completionTag);
175-
176-
/*
177-
* Check end of query execution at the remote instance.
178-
*/
179234
if (conn)
180-
while ((result=PQgetResult(conn))!=NULL);
235+
cancelQueryIfNeeded(conn,queryString);
236+
//pgfdw_get_result(conn, queryString);
181237
}
182238

239+
183240
staticvoid
184241
HOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags)
185242
{
186243
Node*parsetree=queryDesc->plannedstmt->utilityStmt;
187-
PGresult*result;
188-
PGconn*dest=EstablishConnection();
189244

190245
if (prev_ExecutorStart)
191246
prev_ExecutorStart(queryDesc,eflags);
192247
else
193248
standard_ExecutorStart(queryDesc,eflags);
194-
elog(LOG,"QUERY: %s",queryDesc->sourceText);
249+
195250
/*
196251
* This not fully correct sign for prevent passing each subquery to
197252
* the remote instance. Only for demo.
@@ -200,30 +255,32 @@ elog(LOG, "QUERY: %s", queryDesc->sourceText);
200255
queryDesc->plannedstmt->canSetTag&&
201256
((parsetree==NULL)|| (nodeTag(parsetree)!=T_CreatedbStmt))&&
202257
!(eflags&EXEC_FLAG_EXPLAIN_ONLY))
203-
{
204-
/*
205-
* Prepare connection.
206-
*/
207-
while ((result=PQgetResult(dest))!=NULL);
208-
elog(LOG,"->QUERY: %s",queryDesc->sourceText);
209-
if (PQsendPlan(dest,serialize_plan(queryDesc,eflags))==0)
210-
/*
211-
* Report about remote execution error.
212-
*/
213-
elog(ERROR,"Connection errors during PLAN transferring: status=%d, errmsg=%s",
214-
PQstatus(dest),PQerrorMessage(dest));
215-
}
258+
{
259+
Oidserverid;
260+
UserMapping*user;
261+
262+
serverid=get_foreign_server_oid(remote_server_fdwname, true);
263+
Assert(OidIsValid(serverid));
264+
265+
user=GetUserMapping(GetUserId(),serverid);
266+
conn=GetConnection(user, true);
267+
cancelQueryIfNeeded(conn,queryDesc->sourceText);
268+
269+
if (PQsendPlan(conn,serialize_plan(queryDesc,eflags))==0)
270+
{
271+
pgfdw_report_error(ERROR,NULL,conn, false,queryDesc->sourceText);
272+
Assert(0);
273+
}
274+
else
275+
printf("Send Query %s - OK\n",queryDesc->sourceText);
276+
}
216277
}
217278

218279
staticvoid
219280
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
220281
{
221282
if (conn)
222-
{
223-
PGresult*result;
224-
225-
while ((result=PQgetResult(conn))!=NULL);
226-
}
283+
cancelQueryIfNeeded(conn,queryDesc->sourceText);
227284

228285
if (prev_ExecutorEnd)
229286
prev_ExecutorEnd(queryDesc);

‎repeater.control

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ comment = 'Execute raw query plan on remote node'
33
default_version = '0.1'
44
module_pathname = '$libdir/repeater'
55
relocatable = false
6+
requires = 'postgres_fdw'

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp