Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaf234df

Browse files
committed
COPY FROM to postgres_fdw implementation for pg_shardman.
This is the commit message#2:Tracking on which foreign servers we already started COPY.If foreign server holds several partitions, COPY FROM to local root partitionwill try to perform several copies at the same time through one connection,obviously without much success. Now we track that and start/end COPY only once.We also allow to pass destination relation name which may be different fromforeing table -- so we can copy into foreign root partition in shardman. Thisis pretty narrow solution. However, keeping several connections to the sameforeign server requires significant changes, especially in 2pc handling, sostaying here for now.This is the commit message#3:Allow COPY FROM to par8d table even if some FDW parts can't do that.This behaviour was broken in patches allowing COPY FROM to FDW tables.This is the commit message#4:COPY FROM deparse more complete, PG_SHARDMAN macro.Now column names, FORCE NULL and FORCE NOT NULL are deparsed too.PG_SHARDMAN macro ensures this PG contains patches for Postgres.This is the commit message#5:Disable COPY FROM to foreign parts, because no generic impl exists.This is the commit message#6:Fix COPY FROM deparse, forgotten comma for FORCE_NULL etc.
1 parentb79402a commitaf234df

File tree

10 files changed

+530
-190
lines changed

10 files changed

+530
-190
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include"access/xact.h"
2020
#include"access/xtm.h"
2121
#include"access/transam.h"
22+
#include"access/xlog.h"
23+
#include"libpq-int.h"
2224
#include"mb/pg_wchar.h"
2325
#include"miscadmin.h"
2426
#include"pgstat.h"
@@ -60,6 +62,7 @@ typedef struct ConnCacheEntry
6062
boolinvalidated;/* true if reconnect is pending */
6163
uint32server_hashvalue;/* hash value of foreign server OID */
6264
uint32mapping_hashvalue;/* hash value of user mapping OID */
65+
boolcopy_from_started;/* COPY FROM in progress on this conn */
6366
}ConnCacheEntry;
6467

6568
/*
@@ -112,7 +115,8 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
112115
* (not even on error), we need this flag to cue manual cleanup.
113116
*/
114117
PGconn*
115-
GetConnection(UserMapping*user,boolwill_prep_stmt)
118+
GetConnectionCopyFrom(UserMapping*user,boolwill_prep_stmt,
119+
bool**copy_from_started)
116120
{
117121
boolfound;
118122
ConnCacheEntry*entry;
@@ -198,6 +202,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
198202
entry->have_error= false;
199203
entry->changing_xact_state= false;
200204
entry->invalidated= false;
205+
entry->copy_from_started= false;
201206
entry->server_hashvalue=
202207
GetSysCacheHashValue1(FOREIGNSERVEROID,
203208
ObjectIdGetDatum(server->serverid));
@@ -220,9 +225,17 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
220225
/* Remember if caller will prepare statements */
221226
entry->have_prep_stmt |=will_prep_stmt;
222227

228+
if (copy_from_started)
229+
*copy_from_started=&(entry->copy_from_started);
223230
returnentry->conn;
224231
}
225232

233+
PGconn*
234+
GetConnection(UserMapping*user,boolwill_prep_stmt)
235+
{
236+
returnGetConnectionCopyFrom(user,will_prep_stmt,NULL);
237+
}
238+
226239
/*
227240
* Connect to remote server using specified server and user mapping properties.
228241
*/
@@ -1231,21 +1244,40 @@ pgfdw_cancel_query(PGconn *conn)
12311244
endtime=TimestampTzPlusMilliseconds(GetCurrentTimestamp(),30000);
12321245

12331246
/*
1234-
* Issue cancel request. Unfortunately, there's no good way to limit the
1235-
* amount of time that we might block inside PQgetCancel().
1247+
* If COPY IN in progress, send CopyFail. Otherwise send cancel request.
1248+
* TODO: make it less hackish, without libpq-int.h inclusion and handling
1249+
* EAGAIN.
12361250
*/
1237-
if ((cancel=PQgetCancel(conn)))
1251+
if (conn->asyncStatus==PGASYNC_COPY_IN)
12381252
{
1239-
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
1253+
if (PQputCopyEnd(conn,"postgres_fdw: transaction abort on source node")!=1)
12401254
{
12411255
ereport(WARNING,
12421256
(errcode(ERRCODE_CONNECTION_FAILURE),
1243-
errmsg("could not sendcancel request: %s",
1257+
errmsg("could not sendabort copy request: %s",
12441258
errbuf)));
1245-
PQfreeCancel(cancel);
12461259
return false;
12471260
}
1248-
PQfreeCancel(cancel);
1261+
}
1262+
else
1263+
{
1264+
/*
1265+
* Issue cancel request. Unfortunately, there's no good way to limit the
1266+
* amount of time that we might block inside PQgetCancel().
1267+
*/
1268+
if ((cancel=PQgetCancel(conn)))
1269+
{
1270+
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
1271+
{
1272+
ereport(WARNING,
1273+
(errcode(ERRCODE_CONNECTION_FAILURE),
1274+
errmsg("could not send cancel request: %s",
1275+
errbuf)));
1276+
PQfreeCancel(cancel);
1277+
return false;
1278+
}
1279+
PQfreeCancel(cancel);
1280+
}
12491281
}
12501282

12511283
/* Get and discard the result of the query. */

‎contrib/postgres_fdw/deparse.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include"catalog/pg_proc.h"
4646
#include"catalog/pg_type.h"
4747
#include"commands/defrem.h"
48+
#include"commands/copy.h"
49+
#include"mb/pg_wchar.h"
4850
#include"nodes/makefuncs.h"
4951
#include"nodes/nodeFuncs.h"
5052
#include"nodes/plannodes.h"
@@ -3172,3 +3174,107 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
31723174
/* Shouldn't get here */
31733175
elog(ERROR,"unexpected expression in subquery output");
31743176
}
3177+
3178+
/*
3179+
* Deparse COPY FROM into given buf.
3180+
*/
3181+
void
3182+
deparseCopyFromSql(StringInfobuf,Relationrel,CopyStatecstate,
3183+
constchar*dest_relname)
3184+
{
3185+
ListCell*cur;
3186+
3187+
appendStringInfoString(buf,"COPY ");
3188+
if (dest_relname==NULL)
3189+
deparseRelation(buf,rel);
3190+
else
3191+
appendStringInfoString(buf,dest_relname);
3192+
3193+
if (cstate->binary)
3194+
{
3195+
ereport(ERROR,
3196+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3197+
errmsg("cannot copy to postgres_fdw table \"%s\" in binary format ",
3198+
RelationGetRelationName(rel))));
3199+
}
3200+
3201+
/* deparse column names */
3202+
if (cstate->attnumlist!=NIL)
3203+
{
3204+
boolfirst= true;
3205+
3206+
appendStringInfoString(buf," (");
3207+
foreach(cur,cstate->attnumlist)
3208+
{
3209+
intattnum=lfirst_int(cur);
3210+
char*attname;
3211+
3212+
if (!first)
3213+
appendStringInfoString(buf,", ");
3214+
first= false;
3215+
3216+
attname=get_relid_attribute_name(rel->rd_id,attnum);
3217+
appendStringInfoString(buf,quote_identifier(attname));
3218+
}
3219+
appendStringInfoString(buf," )");
3220+
}
3221+
3222+
appendStringInfoString(buf," FROM STDIN WITH (");
3223+
if (cstate->csv_mode)
3224+
{
3225+
appendStringInfoString(buf," FORMAT csv ");
3226+
appendStringInfo(buf,", QUOTE '%c'",*(cstate->quote));
3227+
appendStringInfo(buf,", ESCAPE '%c'",*(cstate->escape));
3228+
if (cstate->force_notnull!=NIL)
3229+
{
3230+
boolfirst= true;
3231+
3232+
appendStringInfoString(buf,", FORCE_NOT_NULL (");
3233+
foreach(cur,cstate->force_notnull)
3234+
{
3235+
char*attname=strVal(lfirst(cur));
3236+
3237+
if (!first)
3238+
appendStringInfoString(buf,", ");
3239+
first= false;
3240+
3241+
appendStringInfoString(buf,quote_identifier(attname));
3242+
}
3243+
appendStringInfoString(buf," )");
3244+
}
3245+
if (cstate->force_null!=NIL)
3246+
{
3247+
boolfirst= true;
3248+
3249+
appendStringInfoString(buf,", FORCE_NULL (");
3250+
foreach(cur,cstate->force_null)
3251+
{
3252+
char*attname=strVal(lfirst(cur));
3253+
3254+
if (!first)
3255+
appendStringInfoString(buf,", ");
3256+
first= false;
3257+
3258+
appendStringInfoString(buf,quote_identifier(attname));
3259+
}
3260+
appendStringInfoString(buf," )");
3261+
}
3262+
}
3263+
else
3264+
{
3265+
appendStringInfoString(buf," FORMAT text ");
3266+
}
3267+
3268+
appendStringInfo(buf,", OIDS %d",cstate->oids);
3269+
appendStringInfo(buf,", FREEZE %d",cstate->freeze);
3270+
appendStringInfo(buf,", DELIMITER '%c'",*(cstate->delim));
3271+
appendStringInfo(buf,", NULL %s",quote_literal_cstr(cstate->null_print));
3272+
/*
3273+
* cstate->line_buf is passed to us already converted to this server
3274+
* encoding.
3275+
*/
3276+
appendStringInfo(buf,", ENCODING %s",
3277+
quote_literal_cstr(
3278+
pg_encoding_to_char(GetDatabaseEncoding())));
3279+
appendStringInfoChar(buf,')');
3280+
}

‎contrib/postgres_fdw/postgres_fdw.c

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,15 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
352352
UpperRelationKindstage,
353353
RelOptInfo*input_rel,
354354
RelOptInfo*output_rel);
355+
staticvoidpostgresBeginForeignCopyFrom(EState*estate,
356+
ResultRelInfo*rinfo,
357+
CopyStatecstate,
358+
ResultRelInfo*parent_rinfo);
359+
staticvoidpostgresForeignNextCopyFrom(EState*estate,
360+
ResultRelInfo*rinfo,
361+
CopyStatecstate);
362+
staticvoidpostgresEndForeignCopyFrom(EState*estate,
363+
ResultRelInfo*rinfo);
355364

356365
/*
357366
* Helper functions
@@ -476,6 +485,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
476485
/* Support functions for upper relation push-down */
477486
routine->GetForeignUpperPaths=postgresGetForeignUpperPaths;
478487

488+
/* Functions for COPY FROM */
489+
routine->BeginForeignCopyFrom=postgresBeginForeignCopyFrom;
490+
routine->ForeignNextCopyFrom=postgresForeignNextCopyFrom;
491+
routine->EndForeignCopyFrom=postgresEndForeignCopyFrom;
492+
479493
PG_RETURN_POINTER(routine);
480494
}
481495

@@ -5200,6 +5214,110 @@ postgres_fdw_exec(PG_FUNCTION_ARGS)
52005214
PG_RETURN_VOID();
52015215
}
52025216

5217+
/*
5218+
* Begin COPY FROM to foreign table. Currently we do it in a bit perverted
5219+
* way: we redirect COPY FROM to parent table on foreign server, assuming it
5220+
* exists in public schema (as in shardman), and let it direct tuples to
5221+
* proper partitions. Otherwise we would have to modify logic of managing
5222+
* connections and keep many connections open to one server from one backend.
5223+
* This probably should not be used outside pg_shardman.
5224+
*/
5225+
staticvoid
5226+
postgresBeginForeignCopyFrom(EState*estate,ResultRelInfo*rinfo,
5227+
CopyStatecstate,ResultRelInfo*parent_rinfo)
5228+
{
5229+
Relationrel=rinfo->ri_RelationDesc;
5230+
RangeTblEntry*rte;
5231+
Oiduserid;
5232+
ForeignTable*table;
5233+
UserMapping*user;
5234+
StringInfoDatasql;
5235+
PGconn*conn;
5236+
PGresult*res;
5237+
bool*copy_from_started;
5238+
char*dest_relname;
5239+
5240+
/*
5241+
* Identify which user to do the remote access as. This should match what
5242+
* ExecCheckRTEPerms() does.
5243+
*/
5244+
rte=rt_fetch(rinfo->ri_RangeTableIndex,estate->es_range_table);
5245+
userid=rte->checkAsUser ?rte->checkAsUser :GetUserId();
5246+
5247+
/* Get info about foreign table. */
5248+
table=GetForeignTable(RelationGetRelid(rel));
5249+
user=GetUserMapping(userid,table->serverid);
5250+
rinfo->ri_FdwState=user;
5251+
5252+
/* Get (open, if not yet) connection */
5253+
conn=GetConnectionCopyFrom(user, false,&copy_from_started);
5254+
/* We already did COPY FROM to this server */
5255+
if (*copy_from_started)
5256+
return;
5257+
5258+
/* deparse COPY stmt */
5259+
dest_relname=psprintf(
5260+
"public.%s",quote_identifier(RelationGetRelationName(
5261+
parent_rinfo==NULL ?
5262+
rinfo->ri_RelationDesc :
5263+
parent_rinfo->ri_RelationDesc)));
5264+
initStringInfo(&sql);
5265+
deparseCopyFromSql(&sql,rel,cstate,dest_relname);
5266+
5267+
res=PQexec(conn,sql.data);
5268+
if (PQresultStatus(res)!=PGRES_COPY_IN)
5269+
{
5270+
pgfdw_report_error(ERROR,res,conn, true,sql.data);
5271+
}
5272+
PQclear(res);
5273+
5274+
*copy_from_started= true;
5275+
}
5276+
5277+
/* COPY FROM next row to foreign table */
5278+
staticvoid
5279+
postgresForeignNextCopyFrom(EState*estate,ResultRelInfo*rinfo,
5280+
CopyStatecstate)
5281+
{
5282+
bool*copy_from_started;
5283+
UserMapping*user= (UserMapping*)rinfo->ri_FdwState;
5284+
PGconn*conn=GetConnectionCopyFrom(user, false,&copy_from_started);
5285+
5286+
Assert(copy_from_started);
5287+
Assert(!cstate->binary);
5288+
/* TODO: distinuish failure and nonblocking-send EAGAIN */
5289+
if (PQputline(conn,cstate->line_buf.data)||PQputnbytes(conn,"\n",1))
5290+
{
5291+
pgfdw_report_error(ERROR,NULL,conn, false,cstate->line_buf.data);
5292+
}
5293+
}
5294+
5295+
/* Finish COPY FROM */
5296+
staticvoid
5297+
postgresEndForeignCopyFrom(EState*estate,ResultRelInfo*rinfo)
5298+
{
5299+
bool*copy_from_started;
5300+
UserMapping*user= (UserMapping*)rinfo->ri_FdwState;
5301+
PGconn*conn=GetConnectionCopyFrom(user, false,&copy_from_started);
5302+
PGresult*res;
5303+
5304+
if (*copy_from_started)
5305+
{
5306+
/* TODO: PQgetResult? */
5307+
if (PQendcopy(conn))
5308+
{
5309+
pgfdw_report_error(ERROR,NULL,conn, false,"end postgres_fdw copy from");
5310+
}
5311+
while ((res=PQgetResult(conn))!=NULL)
5312+
{
5313+
/* TODO: get error? */
5314+
PQclear(res);
5315+
}
5316+
*copy_from_started= false;
5317+
ReleaseConnection(conn);
5318+
}
5319+
}
5320+
52035321
void
52045322
_PG_init(void)
52055323
{

‎contrib/postgres_fdw/postgres_fdw.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#definePOSTGRES_FDW_H
1515

1616
#include"foreign/foreign.h"
17+
#include"commands/copy.h"
1718
#include"lib/stringinfo.h"
1819
#include"nodes/relation.h"
1920
#include"utils/relcache.h"
@@ -116,6 +117,8 @@ extern void reset_transmission_modes(int nestlevel);
116117

117118
/* in connection.c */
118119
externPGconn*GetConnection(UserMapping*user,boolwill_prep_stmt);
120+
externPGconn*GetConnectionCopyFrom(UserMapping*user,boolwill_prep_stmt,
121+
bool**copy_from_started);
119122
externvoidReleaseConnection(PGconn*conn);
120123
externunsignedintGetCursorNumber(PGconn*conn);
121124
externunsignedintGetPrepStmtNumber(PGconn*conn);
@@ -177,6 +180,8 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
177180
List*remote_conds,List*pathkeys,boolis_subquery,
178181
List**retrieved_attrs,List**params_list);
179182
externconstchar*get_jointype_name(JoinTypejointype);
183+
externvoiddeparseCopyFromSql(StringInfobuf,Relationrel,CopyStatecstate,
184+
constchar*dest_relname);
180185

181186
/* in shippable.c */
182187
externboolis_builtin(OidobjectId);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp