Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfd7d901

Browse files
committed
Add pg_repeater and functionality for passing regression tests
1 parent6dfba1a commitfd7d901

File tree

3 files changed

+52
-86
lines changed

3 files changed

+52
-86
lines changed

‎Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ MODULE_big = pg_repeater
44
EXTENSION = pg_repeater
55
EXTVERSION = 0.1
66
PGFILEDESC = "pg_repeater"
7-
MODULES =pg_repeater1
7+
MODULES =pg_repeater
88
OBJS = pg_repeater.o$(WIN32RES)
99

1010
fdw_srcdir =$(top_srcdir)/contrib/postgres_fdw/

‎pg_repeater.c

Lines changed: 50 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,19 @@
1717
#include"commands/extension.h"
1818
#include"executor/executor.h"
1919
#include"fmgr.h"
20+
#include"foreign/foreign.h"
2021
#include"libpq/libpq.h"
2122
#include"libpq-fe.h"
23+
#include"miscadmin.h"
2224
#include"optimizer/planner.h"
25+
#include"pgstat.h"
26+
#include"postgres_fdw.h"
27+
#include"storage/latch.h"
2328
#include"tcop/utility.h"
29+
#include"utils/builtins.h"
2430
#include"utils/guc.h"
31+
#include"utils/memutils.h"
32+
2533

2634
PG_MODULE_MAGIC;
2735

@@ -33,8 +41,7 @@ static ExecutorEnd_hook_typeprev_ExecutorEnd = NULL;
3341

3442
staticvoidHOOK_Utility_injection(PlannedStmt*pstmt,constchar*queryString,
3543
ProcessUtilityContextcontext,ParamListInfoparams,
36-
QueryEnvironment*queryEnv,DestReceiver*dest,
37-
char*completionTag);
44+
DestReceiver*dest,char*completionTag);
3845
staticvoidHOOK_ExecStart_injection(QueryDesc*queryDesc,inteflags);
3946
staticvoidHOOK_ExecEnd_injection(QueryDesc*queryDesc);
4047

@@ -44,6 +51,10 @@ char*remote_server_fdwname;
4451
staticboolExtensionIsActivated= false;
4552
staticPGconn*conn=NULL;
4653

54+
staticOidserverid=InvalidOid;
55+
staticUserMapping*user=NULL;
56+
57+
4758
/*
4859
* Module load/unload callback
4960
*/
@@ -80,79 +91,14 @@ ExtensionIsActive(void)
8091

8192
if (
8293
!IsTransactionState()||
83-
!OidIsValid(get_extension_oid("repeater", true))
94+
!OidIsValid(get_extension_oid("pg_repeater", true))
8495
)
8596
return false;
8697

8798
ExtensionIsActivated= true;
8899
returnExtensionIsActivated;
89100
}
90101

91-
#include"miscadmin.h"
92-
#include"pgstat.h"
93-
#include"storage/latch.h"
94-
95-
#include"foreign/foreign.h"
96-
#include"postgres_fdw.h"
97-
98-
staticOidserverid=InvalidOid;
99-
staticUserMapping*user=NULL;
100-
101-
staticbool
102-
pgfdw_cancel_query(PGconn*conn)
103-
{
104-
PGcancel*cancel;
105-
charerrbuf[256];
106-
PGresult*result=NULL;
107-
108-
if ((cancel=PQgetCancel(conn)))
109-
{
110-
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
111-
{
112-
ereport(WARNING,
113-
(errcode(ERRCODE_CONNECTION_FAILURE),
114-
errmsg("could not send cancel request: %s",
115-
errbuf)));
116-
PQfreeCancel(cancel);
117-
return false;
118-
}
119-
120-
PQfreeCancel(cancel);
121-
}
122-
else
123-
elog(FATAL,"Can't get connection cancel descriptor");
124-
125-
PQconsumeInput(conn);
126-
PQclear(result);
127-
128-
return true;
129-
}
130-
131-
staticvoid
132-
cancelQueryIfNeeded(PGconn*conn,constchar*query)
133-
{
134-
Assert(conn!=NULL);
135-
Assert(query!=NULL);
136-
137-
if (PQtransactionStatus(conn)!=PQTRANS_IDLE)
138-
{
139-
PGresult*res;
140-
141-
printf("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n",
142-
PQstatus(conn),
143-
PQtransactionStatus(conn),
144-
PQerrorMessage(conn));
145-
146-
res=PQgetResult(conn);
147-
148-
if (PQresultStatus(res)==PGRES_FATAL_ERROR)
149-
Assert(pgfdw_cancel_query(conn));
150-
else
151-
pgfdw_get_result(conn,query);
152-
}
153-
154-
}
155-
156102
/*
157103
* We need to send some DML queries for sync database schema to a plan execution
158104
* at a remote instance.
@@ -162,7 +108,6 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
162108
constchar*queryString,
163109
ProcessUtilityContextcontext,
164110
ParamListInfoparams,
165-
QueryEnvironment*queryEnv,
166111
DestReceiver*dest,
167112
char*completionTag)
168113
{
@@ -192,6 +137,8 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
192137
caseT_VacuumStmt:
193138
break;
194139
default:
140+
{
141+
PGresult*res;
195142
if (nodeTag(parsetree)==T_TransactionStmt)
196143
{
197144
TransactionStmt*stmt= (TransactionStmt*)parsetree;
@@ -202,26 +149,23 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
202149
)
203150
break;
204151
}
205-
if (conn)
206-
cancelQueryIfNeeded(conn,queryString);
207152
conn=GetConnection(user, true);
208-
cancelQueryIfNeeded(conn,queryString);
209153
Assert(conn!=NULL);
210154

211-
Assert(PQsendQuery(conn,queryString));
155+
res=PQexec(conn,queryString);
156+
PQclear(res);
157+
}
212158
break;
213-
};
159+
}
214160
}
215161

216162
if (next_ProcessUtility_hook)
217163
(*next_ProcessUtility_hook) (pstmt,queryString,context,params,
218-
queryEnv,dest,completionTag);
164+
dest,completionTag);
219165
else
220166
standard_ProcessUtility(pstmt,queryString,
221-
context,params,queryEnv,
167+
context,params,
222168
dest,completionTag);
223-
if (conn)
224-
cancelQueryIfNeeded(conn,queryString);
225169
}
226170

227171
staticvoid
@@ -245,25 +189,47 @@ HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
245189
{
246190
Oidserverid;
247191
UserMapping*user;
192+
char*query,
193+
*query_container,
194+
*plan,
195+
*plan_container;
196+
intqlen,qlen1,
197+
plen,plen1;
198+
PGresult*res;
248199

249200
serverid=get_foreign_server_oid(remote_server_fdwname, true);
250201
Assert(OidIsValid(serverid));
251202

252203
user=GetUserMapping(GetUserId(),serverid);
253204
conn=GetConnection(user, true);
254-
cancelQueryIfNeeded(conn,queryDesc->sourceText);
255205

256-
if (PQsendPlan(conn,serialize_plan(queryDesc,eflags))==0)
257-
pgfdw_report_error(ERROR,NULL,conn, false,queryDesc->sourceText);
206+
set_portable_output(true);
207+
plan=nodeToString(queryDesc->plannedstmt);
208+
set_portable_output(false);
209+
plen=b64_enc_len(plan,strlen(plan)+1);
210+
plan_container= (char*)palloc0(plen+1);
211+
plen1=b64_encode(plan,strlen(plan),plan_container);
212+
Assert(plen>plen1);
213+
214+
qlen=b64_enc_len(queryDesc->sourceText,strlen(queryDesc->sourceText)+1);
215+
query_container= (char*)palloc0(qlen+1);
216+
qlen1=b64_encode(queryDesc->sourceText,strlen(queryDesc->sourceText),query_container);
217+
Assert(qlen>qlen1);
218+
219+
query=palloc0(qlen+plen+100);
220+
sprintf(query,"SELECT public.pg_exec_plan('%s', '%s');",query_container,plan_container);
221+
222+
res=PQexec(conn,query);
223+
PQclear(res);
224+
pfree(query);
225+
pfree(query_container);
226+
pfree(plan_container);
258227
}
259228
}
260229

261230
staticvoid
262231
HOOK_ExecEnd_injection(QueryDesc*queryDesc)
263232
{
264-
if (conn)
265-
cancelQueryIfNeeded(conn,queryDesc->sourceText);
266-
267233
if (prev_ExecutorEnd)
268234
prev_ExecutorEnd(queryDesc);
269235
else

‎pg_repeater.control

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ comment = 'Pass raw query plan to a remote node'
33
default_version = '0.1'
44
module_pathname = '$libdir/pg_repeater'
55
relocatable = false
6-
requires = 'postgres_fdw pg_execplan'
6+
requires = 'postgres_fdw, pg_execplan'

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp