Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit221220a

Browse files
committed
COPY FROM to postgres_fdw implementation for pg_shardman
Cherry-picked fromaf234df.This is the commit message#2:Tracking on which foreign servers we already started COPY.If foreign server holds several partitions, COPY FROM to local root partitionwill try to perform several copies at the same time through one connection,obviously without much success. Now we track that and start/end COPY only once.We also allow to pass destination relation name which may be different fromforeing table -- so we can copy into foreign root partition in shardman. Thisis pretty narrow solution. However, keeping several connections to the sameforeign server requires significant changes, especially in 2pc handling, sostaying here for now.This is the commit message#3:Allow COPY FROM to par8d table even if some FDW parts can't do that.This behaviour was broken in patches allowing COPY FROM to FDW tables.This is the commit message#4:COPY FROM deparse more complete, PG_SHARDMAN macro.Now column names, FORCE NULL and FORCE NOT NULL are deparsed too.PG_SHARDMAN macro ensures this PG contains patches for Postgres.This is the commit message#5:Disable COPY FROM to foreign parts, because no generic impl exists.This is the commit message#6:Fix COPY FROM deparse, forgotten comma for FORCE_NULL etc.
1 parentbf49026 commit221220a

File tree

10 files changed

+499
-167
lines changed

10 files changed

+499
-167
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#include"access/htup_details.h"
1919
#include"catalog/pg_user_mapping.h"
2020
#include"access/xact.h"
21+
#include"access/transam.h"
2122
#include"access/xlog.h"/* GetSystemIdentifier() */
23+
#include"libpq-int.h"
2224
#include"mb/pg_wchar.h"
2325
#include"miscadmin.h"
2426
#include"pgstat.h"
@@ -63,7 +65,8 @@ struct ConnCacheEntry
6365
boolinvalidated;/* true if reconnect is pending */
6466
uint32server_hashvalue;/* hash value of foreign server OID */
6567
uint32mapping_hashvalue;/* hash value of user mapping OID */
66-
};
68+
boolcopy_from_started;/* COPY FROM in progress on this conn */
69+
} ;
6770

6871
/*
6972
* Connection cache (initialized on first use)
@@ -129,7 +132,8 @@ static bool pgfdw_get_cleanup_result(ConnCacheEntry *entry, TimestampTz endtime,
129132
* (not even on error), we need this flag to cue manual cleanup.
130133
*/
131134
ConnCacheEntry*
132-
GetConnection(UserMapping*user,boolwill_prep_stmt)
135+
GetConnectionCopyFrom(UserMapping*user,boolwill_prep_stmt,
136+
bool**copy_from_started)
133137
{
134138
boolfound;
135139
ConnCacheEntry*entry;
@@ -224,6 +228,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
224228
entry->have_error= false;
225229
entry->changing_xact_state= false;
226230
entry->invalidated= false;
231+
entry->copy_from_started= false;
227232
entry->server_hashvalue=
228233
GetSysCacheHashValue1(FOREIGNSERVEROID,
229234
ObjectIdGetDatum(server->serverid));
@@ -246,6 +251,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
246251
/* Remember if caller will prepare statements */
247252
entry->have_prep_stmt |=will_prep_stmt;
248253

254+
if (copy_from_started)
255+
*copy_from_started=&(entry->copy_from_started);
256+
249257
returnentry;
250258
}
251259

@@ -255,6 +263,12 @@ ConnectionEntryGetConn(ConnCacheEntry *entry)
255263
returnentry->conn;
256264
}
257265

266+
ConnCacheEntry*
267+
GetConnection(UserMapping*user,boolwill_prep_stmt)
268+
{
269+
returnGetConnectionCopyFrom(user,will_prep_stmt,NULL);
270+
}
271+
258272
/*
259273
* Connect to remote server using specified server and user mapping properties.
260274
*/
@@ -1292,21 +1306,40 @@ pgfdw_cancel_query(ConnCacheEntry *entry)
12921306
endtime=TimestampTzPlusMilliseconds(GetCurrentTimestamp(),30000);
12931307

12941308
/*
1295-
* Issue cancel request. Unfortunately, there's no good way to limit the
1296-
* amount of time that we might block inside PQgetCancel().
1309+
* If COPY IN in progress, send CopyFail. Otherwise send cancel request.
1310+
* TODO: make it less hackish, without libpq-int.h inclusion and handling
1311+
* EAGAIN.
12971312
*/
1298-
if ((cancel=PQgetCancel(conn)))
1313+
if (conn->asyncStatus==PGASYNC_COPY_IN)
12991314
{
1300-
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
1315+
if (PQputCopyEnd(conn,"postgres_fdw: transaction abort on source node")!=1)
13011316
{
13021317
ereport(WARNING,
13031318
(errcode(ERRCODE_CONNECTION_FAILURE),
1304-
errmsg("could not sendcancel request: %s",
1319+
errmsg("could not sendabort copy request: %s",
13051320
errbuf)));
1306-
PQfreeCancel(cancel);
13071321
return false;
13081322
}
1309-
PQfreeCancel(cancel);
1323+
}
1324+
else
1325+
{
1326+
/*
1327+
* Issue cancel request. Unfortunately, there's no good way to limit the
1328+
* amount of time that we might block inside PQgetCancel().
1329+
*/
1330+
if ((cancel=PQgetCancel(conn)))
1331+
{
1332+
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
1333+
{
1334+
ereport(WARNING,
1335+
(errcode(ERRCODE_CONNECTION_FAILURE),
1336+
errmsg("could not send cancel request: %s",
1337+
errbuf)));
1338+
PQfreeCancel(cancel);
1339+
return false;
1340+
}
1341+
PQfreeCancel(cancel);
1342+
}
13101343
}
13111344

13121345
/* Get and discard the result of the query. */

‎contrib/postgres_fdw/deparse.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include"catalog/pg_proc.h"
4646
#include"catalog/pg_type.h"
4747
#include"commands/defrem.h"
48+
#include"commands/copy.h"
49+
#include"mb/pg_wchar.h"
4850
#include"nodes/makefuncs.h"
4951
#include"nodes/nodeFuncs.h"
5052
#include"nodes/plannodes.h"
@@ -3323,3 +3325,107 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
33233325
/* Shouldn't get here */
33243326
elog(ERROR,"unexpected expression in subquery output");
33253327
}
3328+
3329+
/*
3330+
* Deparse COPY FROM into given buf.
3331+
*/
3332+
void
3333+
deparseCopyFromSql(StringInfobuf,Relationrel,CopyStatecstate,
3334+
constchar*dest_relname)
3335+
{
3336+
ListCell*cur;
3337+
3338+
appendStringInfoString(buf,"COPY ");
3339+
if (dest_relname==NULL)
3340+
deparseRelation(buf,rel);
3341+
else
3342+
appendStringInfoString(buf,dest_relname);
3343+
3344+
if (cstate->binary)
3345+
{
3346+
ereport(ERROR,
3347+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3348+
errmsg("cannot copy to postgres_fdw table \"%s\" in binary format ",
3349+
RelationGetRelationName(rel))));
3350+
}
3351+
3352+
/* deparse column names */
3353+
if (cstate->attnumlist!=NIL)
3354+
{
3355+
boolfirst= true;
3356+
3357+
appendStringInfoString(buf," (");
3358+
foreach(cur,cstate->attnumlist)
3359+
{
3360+
intattnum=lfirst_int(cur);
3361+
char*attname;
3362+
3363+
if (!first)
3364+
appendStringInfoString(buf,", ");
3365+
first= false;
3366+
3367+
attname=get_attname(rel->rd_id,attnum, false);
3368+
appendStringInfoString(buf,quote_identifier(attname));
3369+
}
3370+
appendStringInfoString(buf," )");
3371+
}
3372+
3373+
appendStringInfoString(buf," FROM STDIN WITH (");
3374+
if (cstate->csv_mode)
3375+
{
3376+
appendStringInfoString(buf," FORMAT csv ");
3377+
appendStringInfo(buf,", QUOTE '%c'",*(cstate->quote));
3378+
appendStringInfo(buf,", ESCAPE '%c'",*(cstate->escape));
3379+
if (cstate->force_notnull!=NIL)
3380+
{
3381+
boolfirst= true;
3382+
3383+
appendStringInfoString(buf,", FORCE_NOT_NULL (");
3384+
foreach(cur,cstate->force_notnull)
3385+
{
3386+
char*attname=strVal(lfirst(cur));
3387+
3388+
if (!first)
3389+
appendStringInfoString(buf,", ");
3390+
first= false;
3391+
3392+
appendStringInfoString(buf,quote_identifier(attname));
3393+
}
3394+
appendStringInfoString(buf," )");
3395+
}
3396+
if (cstate->force_null!=NIL)
3397+
{
3398+
boolfirst= true;
3399+
3400+
appendStringInfoString(buf,", FORCE_NULL (");
3401+
foreach(cur,cstate->force_null)
3402+
{
3403+
char*attname=strVal(lfirst(cur));
3404+
3405+
if (!first)
3406+
appendStringInfoString(buf,", ");
3407+
first= false;
3408+
3409+
appendStringInfoString(buf,quote_identifier(attname));
3410+
}
3411+
appendStringInfoString(buf," )");
3412+
}
3413+
}
3414+
else
3415+
{
3416+
appendStringInfoString(buf," FORMAT text ");
3417+
}
3418+
3419+
appendStringInfo(buf,", OIDS %d",cstate->oids);
3420+
appendStringInfo(buf,", FREEZE %d",cstate->freeze);
3421+
appendStringInfo(buf,", DELIMITER '%c'",*(cstate->delim));
3422+
appendStringInfo(buf,", NULL %s",quote_literal_cstr(cstate->null_print));
3423+
/*
3424+
* cstate->line_buf is passed to us already converted to this server
3425+
* encoding.
3426+
*/
3427+
appendStringInfo(buf,", ENCODING %s",
3428+
quote_literal_cstr(
3429+
pg_encoding_to_char(GetDatabaseEncoding())));
3430+
appendStringInfoChar(buf,')');
3431+
}

‎contrib/postgres_fdw/postgres_fdw.c

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,15 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
362362
RelOptInfo*input_rel,
363363
RelOptInfo*output_rel,
364364
void*extra);
365+
staticvoidpostgresBeginForeignCopyFrom(EState*estate,
366+
ResultRelInfo*rinfo,
367+
CopyStatecstate,
368+
ResultRelInfo*parent_rinfo);
369+
staticvoidpostgresForeignNextCopyFrom(EState*estate,
370+
ResultRelInfo*rinfo,
371+
CopyStatecstate);
372+
staticvoidpostgresEndForeignCopyFrom(EState*estate,
373+
ResultRelInfo*rinfo);
365374

366375
/*
367376
* Helper functions
@@ -509,6 +518,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
509518
/* Support functions for upper relation push-down */
510519
routine->GetForeignUpperPaths=postgresGetForeignUpperPaths;
511520

521+
/* Functions for COPY FROM */
522+
routine->BeginForeignCopyFrom=postgresBeginForeignCopyFrom;
523+
routine->ForeignNextCopyFrom=postgresForeignNextCopyFrom;
524+
routine->EndForeignCopyFrom=postgresEndForeignCopyFrom;
525+
512526
PG_RETURN_POINTER(routine);
513527
}
514528

@@ -5822,6 +5836,114 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
58225836
returnNULL;
58235837
}
58245838

5839+
/*
5840+
* Begin COPY FROM to foreign table. Currently we do it in a bit perverted
5841+
* way: we redirect COPY FROM to parent table on foreign server, assuming it
5842+
* exists in public schema (as in shardman), and let it direct tuples to
5843+
* proper partitions. Otherwise we would have to modify logic of managing
5844+
* connections and keep many connections open to one server from one backend.
5845+
* This probably should not be used outside pg_shardman.
5846+
*/
5847+
staticvoid
5848+
postgresBeginForeignCopyFrom(EState*estate,ResultRelInfo*rinfo,
5849+
CopyStatecstate,ResultRelInfo*parent_rinfo)
5850+
{
5851+
Relationrel=rinfo->ri_RelationDesc;
5852+
RangeTblEntry*rte;
5853+
Oiduserid;
5854+
ForeignTable*table;
5855+
UserMapping*user;
5856+
StringInfoDatasql;
5857+
PGconn*conn;
5858+
PGresult*res;
5859+
bool*copy_from_started;
5860+
char*dest_relname;
5861+
5862+
/*
5863+
* Identify which user to do the remote access as. This should match what
5864+
* ExecCheckRTEPerms() does.
5865+
*/
5866+
rte=rt_fetch(rinfo->ri_RangeTableIndex,estate->es_range_table);
5867+
userid=rte->checkAsUser ?rte->checkAsUser :GetUserId();
5868+
5869+
/* Get info about foreign table. */
5870+
table=GetForeignTable(RelationGetRelid(rel));
5871+
user=GetUserMapping(userid,table->serverid);
5872+
rinfo->ri_FdwState=user;
5873+
5874+
/* Get (open, if not yet) connection */
5875+
conn=ConnectionEntryGetConn(
5876+
GetConnectionCopyFrom(user, false,&copy_from_started));
5877+
/* We already did COPY FROM to this server */
5878+
if (*copy_from_started)
5879+
return;
5880+
5881+
/* deparse COPY stmt */
5882+
dest_relname=psprintf(
5883+
"public.%s",quote_identifier(RelationGetRelationName(
5884+
parent_rinfo==NULL ?
5885+
rinfo->ri_RelationDesc :
5886+
parent_rinfo->ri_RelationDesc)));
5887+
initStringInfo(&sql);
5888+
deparseCopyFromSql(&sql,rel,cstate,dest_relname);
5889+
5890+
res=PQexec(conn,sql.data);
5891+
if (PQresultStatus(res)!=PGRES_COPY_IN)
5892+
{
5893+
pgfdw_report_error(ERROR,res,conn, true,sql.data);
5894+
}
5895+
PQclear(res);
5896+
5897+
*copy_from_started= true;
5898+
}
5899+
5900+
/* COPY FROM next row to foreign table */
5901+
staticvoid
5902+
postgresForeignNextCopyFrom(EState*estate,ResultRelInfo*rinfo,
5903+
CopyStatecstate)
5904+
{
5905+
bool*copy_from_started;
5906+
UserMapping*user= (UserMapping*)rinfo->ri_FdwState;
5907+
PGconn*conn=ConnectionEntryGetConn(
5908+
GetConnectionCopyFrom(user, false,&copy_from_started));
5909+
5910+
Assert(copy_from_started);
5911+
Assert(!cstate->binary);
5912+
/* TODO: distinuish failure and nonblocking-send EAGAIN */
5913+
if (PQputline(conn,cstate->line_buf.data)||PQputnbytes(conn,"\n",1))
5914+
{
5915+
pgfdw_report_error(ERROR,NULL,conn, false,cstate->line_buf.data);
5916+
}
5917+
}
5918+
5919+
/* Finish COPY FROM */
5920+
staticvoid
5921+
postgresEndForeignCopyFrom(EState*estate,ResultRelInfo*rinfo)
5922+
{
5923+
bool*copy_from_started;
5924+
UserMapping*user= (UserMapping*)rinfo->ri_FdwState;
5925+
ConnCacheEntry*conn_entry=GetConnectionCopyFrom(user, false,
5926+
&copy_from_started);
5927+
PGconn*conn=ConnectionEntryGetConn(conn_entry);
5928+
PGresult*res;
5929+
5930+
if (*copy_from_started)
5931+
{
5932+
/* TODO: PQgetResult? */
5933+
if (PQendcopy(conn))
5934+
{
5935+
pgfdw_report_error(ERROR,NULL,conn, false,"end postgres_fdw copy from");
5936+
}
5937+
while ((res=PQgetResult(conn))!=NULL)
5938+
{
5939+
/* TODO: get error? */
5940+
PQclear(res);
5941+
}
5942+
*copy_from_started= false;
5943+
ReleaseConnection(conn_entry);
5944+
}
5945+
}
5946+
58255947
void
58265948
_PG_init(void)
58275949
{

‎contrib/postgres_fdw/postgres_fdw.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#definePOSTGRES_FDW_H
1515

1616
#include"foreign/foreign.h"
17+
#include"commands/copy.h"
1718
#include"lib/stringinfo.h"
1819
#include"nodes/relation.h"
1920
#include"utils/relcache.h"
@@ -122,6 +123,8 @@ extern void reset_transmission_modes(int nestlevel);
122123

123124
/* in connection.c */
124125
externConnCacheEntry*GetConnection(UserMapping*user,boolwill_prep_stmt);
126+
externConnCacheEntry*GetConnectionCopyFrom(UserMapping*user,boolwill_prep_stmt,
127+
bool**copy_from_started);
125128
externPGconn*ConnectionEntryGetConn(ConnCacheEntry*entry);
126129
externvoidReleaseConnection(ConnCacheEntry*entry);
127130
externunsignedintGetCursorNumber(ConnCacheEntry*entry);
@@ -186,6 +189,8 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
186189
List*remote_conds,List*pathkeys,boolis_subquery,
187190
List**retrieved_attrs,List**params_list);
188191
externconstchar*get_jointype_name(JoinTypejointype);
192+
externvoiddeparseCopyFromSql(StringInfobuf,Relationrel,CopyStatecstate,
193+
constchar*dest_relname);
189194

190195
/* in shippable.c */
191196
externboolis_builtin(OidobjectId);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp