Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6dfba1a

Browse files
committed
Initial commit
1 parent8f5b657 commit6dfba1a

File tree

6 files changed

+336
-22
lines changed

6 files changed

+336
-22
lines changed

‎LICENSE

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,11 @@
1-
BSD 2-ClauseLicense
1+
pg_repeater is released under the PostgreSQLLicense, a liberal Open Source license, similar to the BSD or MIT licenses.
22

3-
Copyright (c) 2019, Andrey Lepikhov
4-
All rights reserved.
3+
Copyright (c) 2018-2019, Postgres Professional
4+
Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
5+
Portions Copyright (c) 1994, The Regents of the University of California
56

6-
Redistribution and use in source and binary forms, with or without
7-
modification, are permitted provided that the following conditions are met:
7+
Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies.
88

9-
* Redistributions of source code must retain the above copyright notice, this
10-
list of conditions and the following disclaimer.
9+
IN NO EVENT SHALL POSTGRES PROFESSIONAL BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF POSTGRES PROFESSIONAL HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1110

12-
* Redistributions in binary form must reproduce the above copyright notice,
13-
this list of conditions and the following disclaimer in the documentation
14-
and/or other materials provided with the distribution.
15-
16-
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17-
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18-
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19-
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
20-
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21-
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22-
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23-
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24-
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25-
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
11+
POSTGRES PROFESSIONAL SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND POSTGRES PROFESSIONAL HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.

‎Makefile

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# contrib/pg_repeater/Makefile
2+
3+
MODULE_big = pg_repeater
4+
EXTENSION = pg_repeater
5+
EXTVERSION = 0.1
6+
PGFILEDESC = "pg_repeater"
7+
MODULES = pg_repeater1
8+
OBJS = pg_repeater.o$(WIN32RES)
9+
10+
fdw_srcdir =$(top_srcdir)/contrib/postgres_fdw/
11+
execplan_srcdir =$(top_srcdir)/contrib/pg_execplan/
12+
13+
PG_CPPFLAGS = -I$(libpq_srcdir) -I$(fdw_srcdir) -L$(fdw_srcdir) -I$(execplan_srcdir) -L$(execplan_srcdir)
14+
SHLIB_LINK_INTERNAL =$(libpq)
15+
16+
DATA_built =$(EXTENSION)--$(EXTVERSION).sql
17+
18+
ifdefUSE_PGXS
19+
PG_CONFIG = pg_config
20+
PGXS :=$(shell$(PG_CONFIG) --pgxs)
21+
include$(PGXS)
22+
else
23+
EXTRA_INSTALL = contrib/postgres_fdw contrib/pg_execplan
24+
SHLIB_PREREQS = submake-libpq
25+
subdir = contrib/pg_repeater
26+
top_builddir = ../..
27+
include$(top_builddir)/src/Makefile.global
28+
include$(top_srcdir)/contrib/contrib-global.mk
29+
endif
30+
31+
$(EXTENSION)--$(EXTVERSION).sql: init.sql
32+
cat$^>$@

‎README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,20 @@
11
#pg_repeater
2-
Repeat all queries at remote instance
2+
PostgreSQL patch & extension for UTILITY queries and query plans execution at
3+
remote instance.
4+
5+
Plan is passed by postgres_fdw connection service. It executed by pg_exec_plan()
6+
routine, introduced by pg_execplan extension.
7+
8+
This project dedicated to query execution problem in DBMS for computing systems
9+
with cluster architecture.
10+
11+
The DBMS may need to execute an identical query plan at each computing node.
12+
Today PostgreSQL can process only SQL statements. But it is not guaranteed, that
13+
the planner at each node will construct same query plan, because different
14+
statistics, relation sizes e.t.c.
15+
16+
This solution based on postgres-xl approach: plan tree is serialized by the
17+
nodeToString() routine.
18+
During serialization we transform all database object identifiers (oid) at each
19+
node field to portable representation.
20+

‎init.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
\echo Use"CREATE EXTENSION pg_repeater" to load this file. \quit

‎pg_repeater.c

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* repeater.c
4+
* Simple demo for remote plan execution patch.
5+
*
6+
* Transfer query plan to a remote instance and wait for result.
7+
* Remote instance parameters (host, port) defines by GUCs.
8+
*
9+
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10+
* Portions Copyright (c) 2018-2019, Postgres Professional
11+
*-------------------------------------------------------------------------
12+
*/
13+
14+
#include"postgres.h"
15+
16+
#include"access/xact.h"
17+
#include"commands/extension.h"
18+
#include"executor/executor.h"
19+
#include"fmgr.h"
20+
#include"libpq/libpq.h"
21+
#include"libpq-fe.h"
22+
#include"optimizer/planner.h"
23+
#include"tcop/utility.h"
24+
#include"utils/guc.h"
25+
26+
PG_MODULE_MAGIC;
27+
28+
void_PG_init(void);
29+
30+
staticProcessUtility_hook_typenext_ProcessUtility_hook=NULL;
31+
staticExecutorStart_hook_typeprev_ExecutorStart=NULL;
32+
staticExecutorEnd_hook_typeprev_ExecutorEnd=NULL;
33+
34+
staticvoidHOOK_Utility_injection(PlannedStmt*pstmt,constchar*queryString,
35+
ProcessUtilityContextcontext,ParamListInfoparams,
36+
QueryEnvironment*queryEnv,DestReceiver*dest,
37+
char*completionTag);
38+
staticvoidHOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags);
39+
staticvoidHOOK_ExecEnd_injection(QueryDesc*queryDesc);
40+
41+
/* Remote instance parameters. */
42+
char*remote_server_fdwname;
43+
44+
staticboolExtensionIsActivated= false;
45+
staticPGconn*conn=NULL;
46+
47+
/*
48+
* Module load/unload callback
49+
*/
50+
void
51+
_PG_init(void)
52+
{
53+
DefineCustomStringVariable("repeater.fdwname",
54+
"Remote host fdw name",
55+
NULL,
56+
&remote_server_fdwname,
57+
"remoteserv",
58+
PGC_SIGHUP,
59+
GUC_NOT_IN_SAMPLE,
60+
NULL,
61+
NULL,
62+
NULL);
63+
64+
/* ProcessUtility hook */
65+
next_ProcessUtility_hook=ProcessUtility_hook;
66+
ProcessUtility_hook=HOOK_Utility_injection;
67+
68+
prev_ExecutorStart=ExecutorStart_hook;
69+
ExecutorStart_hook=HOOK_ExecStart_injection;
70+
71+
prev_ExecutorEnd=ExecutorEnd_hook;
72+
ExecutorEnd_hook=HOOK_ExecEnd_injection;
73+
}
74+
75+
staticbool
76+
ExtensionIsActive(void)
77+
{
78+
if (ExtensionIsActivated)
79+
return true;
80+
81+
if (
82+
!IsTransactionState()||
83+
!OidIsValid(get_extension_oid("repeater", true))
84+
)
85+
return false;
86+
87+
ExtensionIsActivated= true;
88+
returnExtensionIsActivated;
89+
}
90+
91+
#include"miscadmin.h"
92+
#include"pgstat.h"
93+
#include"storage/latch.h"
94+
95+
#include"foreign/foreign.h"
96+
#include"postgres_fdw.h"
97+
98+
staticOidserverid=InvalidOid;
99+
staticUserMapping*user=NULL;
100+
101+
staticbool
102+
pgfdw_cancel_query(PGconn*conn)
103+
{
104+
PGcancel*cancel;
105+
charerrbuf[256];
106+
PGresult*result=NULL;
107+
108+
if ((cancel=PQgetCancel(conn)))
109+
{
110+
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
111+
{
112+
ereport(WARNING,
113+
(errcode(ERRCODE_CONNECTION_FAILURE),
114+
errmsg("could not send cancel request: %s",
115+
errbuf)));
116+
PQfreeCancel(cancel);
117+
return false;
118+
}
119+
120+
PQfreeCancel(cancel);
121+
}
122+
else
123+
elog(FATAL,"Can't get connection cancel descriptor");
124+
125+
PQconsumeInput(conn);
126+
PQclear(result);
127+
128+
return true;
129+
}
130+
131+
staticvoid
132+
cancelQueryIfNeeded(PGconn*conn,constchar*query)
133+
{
134+
Assert(conn!=NULL);
135+
Assert(query!=NULL);
136+
137+
if (PQtransactionStatus(conn)!=PQTRANS_IDLE)
138+
{
139+
PGresult*res;
140+
141+
printf("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n",
142+
PQstatus(conn),
143+
PQtransactionStatus(conn),
144+
PQerrorMessage(conn));
145+
146+
res=PQgetResult(conn);
147+
148+
if (PQresultStatus(res)==PGRES_FATAL_ERROR)
149+
Assert(pgfdw_cancel_query(conn));
150+
else
151+
pgfdw_get_result(conn,query);
152+
}
153+
154+
}
155+
156+
/*
157+
* We need to send some DML queries for sync database schema to a plan execution
158+
* at a remote instance.
159+
*/
160+
staticvoid
161+
HOOK_Utility_injection(PlannedStmt*pstmt,
162+
constchar*queryString,
163+
ProcessUtilityContextcontext,
164+
ParamListInfoparams,
165+
QueryEnvironment*queryEnv,
166+
DestReceiver*dest,
167+
char*completionTag)
168+
{
169+
Node*parsetree=pstmt->utilityStmt;
170+
171+
if (ExtensionIsActive()&&
172+
pstmt->canSetTag&&
173+
(context!=PROCESS_UTILITY_SUBCOMMAND)
174+
)
175+
{
176+
if (!user)
177+
{
178+
MemoryContextoldCxt=MemoryContextSwitchTo(TopMemoryContext);
179+
180+
serverid=get_foreign_server_oid(remote_server_fdwname, true);
181+
Assert(OidIsValid(serverid));
182+
183+
user=GetUserMapping(GetUserId(),serverid);
184+
MemoryContextSwitchTo(oldCxt);
185+
}
186+
switch (nodeTag(parsetree))
187+
{
188+
caseT_CopyStmt:
189+
caseT_CreateExtensionStmt:
190+
caseT_ExplainStmt:
191+
caseT_FetchStmt:
192+
caseT_VacuumStmt:
193+
break;
194+
default:
195+
if (nodeTag(parsetree)==T_TransactionStmt)
196+
{
197+
TransactionStmt*stmt= (TransactionStmt*)parsetree;
198+
199+
if (
200+
//(stmt->kind != TRANS_STMT_ROLLBACK_TO) &&
201+
(stmt->kind!=TRANS_STMT_SAVEPOINT)
202+
)
203+
break;
204+
}
205+
if (conn)
206+
cancelQueryIfNeeded(conn,queryString);
207+
conn=GetConnection(user, true);
208+
cancelQueryIfNeeded(conn,queryString);
209+
Assert(conn!=NULL);
210+
211+
Assert(PQsendQuery(conn,queryString));
212+
break;
213+
};
214+
}
215+
216+
if (next_ProcessUtility_hook)
217+
(*next_ProcessUtility_hook) (pstmt,queryString,context,params,
218+
queryEnv,dest,completionTag);
219+
else
220+
standard_ProcessUtility(pstmt,queryString,
221+
context,params,queryEnv,
222+
dest,completionTag);
223+
if (conn)
224+
cancelQueryIfNeeded(conn,queryString);
225+
}
226+
227+
staticvoid
228+
HOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags)
229+
{
230+
Node*parsetree=queryDesc->plannedstmt->utilityStmt;
231+
232+
if (prev_ExecutorStart)
233+
prev_ExecutorStart(queryDesc,eflags);
234+
else
235+
standard_ExecutorStart(queryDesc,eflags);
236+
237+
/*
238+
* This not fully correct sign for prevent passing each subquery to
239+
* the remote instance. Only for demo.
240+
*/
241+
if (ExtensionIsActive()&&
242+
queryDesc->plannedstmt->canSetTag&&
243+
((parsetree==NULL)|| (nodeTag(parsetree)!=T_CreatedbStmt))&&
244+
!(eflags&EXEC_FLAG_EXPLAIN_ONLY))
245+
{
246+
Oidserverid;
247+
UserMapping*user;
248+
249+
serverid=get_foreign_server_oid(remote_server_fdwname, true);
250+
Assert(OidIsValid(serverid));
251+
252+
user=GetUserMapping(GetUserId(),serverid);
253+
conn=GetConnection(user, true);
254+
cancelQueryIfNeeded(conn,queryDesc->sourceText);
255+
256+
if (PQsendPlan(conn,serialize_plan(queryDesc,eflags))==0)
257+
pgfdw_report_error(ERROR,NULL,conn, false,queryDesc->sourceText);
258+
}
259+
}
260+
261+
staticvoid
262+
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
263+
{
264+
if (conn)
265+
cancelQueryIfNeeded(conn,queryDesc->sourceText);
266+
267+
if (prev_ExecutorEnd)
268+
prev_ExecutorEnd(queryDesc);
269+
else
270+
standard_ExecutorEnd(queryDesc);
271+
}

‎pg_repeater.control

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# pg_repeater extension
2+
comment = 'Pass raw query plan to a remote node'
3+
default_version = '0.1'
4+
module_pathname = '$libdir/pg_repeater'
5+
relocatable = false
6+
requires = 'postgres_fdw pg_execplan'

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp