Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9b75dc4

Browse files
arssherkelvich
authored andcommitted
1 parentb79402a commit9b75dc4

File tree

8 files changed

+431
-196
lines changed

8 files changed

+431
-196
lines changed

‎contrib/postgres_fdw/connection.c‎

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include"access/xact.h"
2020
#include"access/xtm.h"
2121
#include"access/transam.h"
22+
#include"access/xlog.h"
23+
#include"libpq-int.h"
2224
#include"mb/pg_wchar.h"
2325
#include"miscadmin.h"
2426
#include"pgstat.h"
@@ -1231,21 +1233,40 @@ pgfdw_cancel_query(PGconn *conn)
12311233
endtime=TimestampTzPlusMilliseconds(GetCurrentTimestamp(),30000);
12321234

12331235
/*
1234-
* Issue cancel request. Unfortunately, there's no good way to limit the
1235-
* amount of time that we might block inside PQgetCancel().
1236+
* If COPY IN in progress, send CopyFail. Otherwise send cancel request.
1237+
* TODO: make it less hackish, without libpq-int.h inclusion and handling
1238+
* EAGAIN.
12361239
*/
1237-
if ((cancel=PQgetCancel(conn)))
1240+
if (conn->asyncStatus==PGASYNC_COPY_IN)
12381241
{
1239-
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
1242+
if (PQputCopyEnd(conn,"postgres_fdw: transaction abort on source node")!=1)
12401243
{
12411244
ereport(WARNING,
12421245
(errcode(ERRCODE_CONNECTION_FAILURE),
1243-
errmsg("could not sendcancel request: %s",
1246+
errmsg("could not sendabort copy request: %s",
12441247
errbuf)));
1245-
PQfreeCancel(cancel);
12461248
return false;
12471249
}
1248-
PQfreeCancel(cancel);
1250+
}
1251+
else
1252+
{
1253+
/*
1254+
* Issue cancel request. Unfortunately, there's no good way to limit the
1255+
* amount of time that we might block inside PQgetCancel().
1256+
*/
1257+
if ((cancel=PQgetCancel(conn)))
1258+
{
1259+
if (!PQcancel(cancel,errbuf,sizeof(errbuf)))
1260+
{
1261+
ereport(WARNING,
1262+
(errcode(ERRCODE_CONNECTION_FAILURE),
1263+
errmsg("could not send cancel request: %s",
1264+
errbuf)));
1265+
PQfreeCancel(cancel);
1266+
return false;
1267+
}
1268+
PQfreeCancel(cancel);
1269+
}
12491270
}
12501271

12511272
/* Get and discard the result of the query. */

‎contrib/postgres_fdw/deparse.c‎

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include"catalog/pg_proc.h"
4646
#include"catalog/pg_type.h"
4747
#include"commands/defrem.h"
48+
#include"commands/copy.h"
49+
#include"mb/pg_wchar.h"
4850
#include"nodes/makefuncs.h"
4951
#include"nodes/nodeFuncs.h"
5052
#include"nodes/plannodes.h"
@@ -3172,3 +3174,47 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
31723174
/* Shouldn't get here */
31733175
elog(ERROR,"unexpected expression in subquery output");
31743176
}
3177+
3178+
/*
3179+
* Deparse COPY FROM
3180+
*/
3181+
void
3182+
deparseCopyFromSql(StringInfobuf,Relationrel,CopyStatecstate)
3183+
{
3184+
appendStringInfoString(buf,"COPY ");
3185+
deparseRelation(buf,rel);
3186+
appendStringInfoString(buf," FROM STDIN WITH (");
3187+
3188+
/* TODO: deparse column names */
3189+
if (cstate->binary)
3190+
{
3191+
ereport(ERROR,
3192+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3193+
errmsg("cannot copy to postgres_fdw table \"%s\" in binary format ",
3194+
RelationGetRelationName(rel))));
3195+
}
3196+
if (cstate->csv_mode)
3197+
{
3198+
appendStringInfoString(buf," FORMAT csv ");
3199+
appendStringInfo(buf,", QUOTE '%c'",*(cstate->quote));
3200+
appendStringInfo(buf,", ESCAPE '%c'",*(cstate->escape));
3201+
/* TODO: force quote, force not null, force null */
3202+
}
3203+
else
3204+
{
3205+
appendStringInfoString(buf," FORMAT text ");
3206+
}
3207+
3208+
appendStringInfo(buf,", OIDS %d",cstate->oids);
3209+
appendStringInfo(buf,", FREEZE %d",cstate->freeze);
3210+
appendStringInfo(buf,", DELIMITER '%c'",*(cstate->delim));
3211+
appendStringInfo(buf,", NULL %s",quote_literal_cstr(cstate->null_print));
3212+
/*
3213+
* cstate->line_buf is passed to us already converted to this server
3214+
* encoding.
3215+
*/
3216+
appendStringInfo(buf,", ENCODING %s",
3217+
quote_literal_cstr(
3218+
pg_encoding_to_char(GetDatabaseEncoding())));
3219+
appendStringInfoChar(buf,')');
3220+
}

‎contrib/postgres_fdw/postgres_fdw.c‎

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,14 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
352352
UpperRelationKindstage,
353353
RelOptInfo*input_rel,
354354
RelOptInfo*output_rel);
355+
staticvoidpostgresBeginForeignCopyFrom(EState*estate,
356+
ResultRelInfo*rinfo,
357+
CopyStatecstate);
358+
staticvoidpostgresForeignNextCopyFrom(EState*estate,
359+
ResultRelInfo*rinfo,
360+
CopyStatecstate);
361+
staticvoidpostgresEndForeignCopyFrom(EState*estate,
362+
ResultRelInfo*rinfo);
355363

356364
/*
357365
* Helper functions
@@ -476,6 +484,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
476484
/* Support functions for upper relation push-down */
477485
routine->GetForeignUpperPaths=postgresGetForeignUpperPaths;
478486

487+
/* Functions for COPY FROM */
488+
routine->BeginForeignCopyFrom=postgresBeginForeignCopyFrom;
489+
routine->ForeignNextCopyFrom=postgresForeignNextCopyFrom;
490+
routine->EndForeignCopyFrom=postgresEndForeignCopyFrom;
491+
479492
PG_RETURN_POINTER(routine);
480493
}
481494

@@ -5208,3 +5221,80 @@ _PG_init(void)
52085221
&UseTsDtmTransactions, false,PGC_USERSET,0,NULL,
52095222
NULL,NULL);
52105223
}
5224+
5225+
/* Begin COPY FROM to foreign table */
5226+
staticvoid
5227+
postgresBeginForeignCopyFrom(EState*estate,ResultRelInfo*rinfo,
5228+
CopyStatecstate)
5229+
{
5230+
Relationrel=rinfo->ri_RelationDesc;
5231+
RangeTblEntry*rte;
5232+
Oiduserid;
5233+
ForeignTable*table;
5234+
UserMapping*user;
5235+
StringInfoDatasql;
5236+
PGconn*conn;
5237+
PGresult*res;
5238+
5239+
/*
5240+
* Identify which user to do the remote access as. This should match what
5241+
* ExecCheckRTEPerms() does.
5242+
*/
5243+
rte=rt_fetch(rinfo->ri_RangeTableIndex,estate->es_range_table);
5244+
userid=rte->checkAsUser ?rte->checkAsUser :GetUserId();
5245+
5246+
/* Get info about foreign table. */
5247+
table=GetForeignTable(RelationGetRelid(rel));
5248+
user=GetUserMapping(userid,table->serverid);
5249+
5250+
/* Open connection */
5251+
conn=GetConnection(user, false);
5252+
rinfo->ri_FdwState=conn;
5253+
5254+
/* deparse COPY stmt */
5255+
initStringInfo(&sql);
5256+
deparseCopyFromSql(&sql,rel,cstate);
5257+
5258+
res=PQexec(conn,sql.data);
5259+
if (PQresultStatus(res)!=PGRES_COPY_IN)
5260+
{
5261+
pgfdw_report_error(ERROR,res,conn, true,sql.data);
5262+
}
5263+
PQclear(res);
5264+
}
5265+
5266+
/* COPY FROM next row to foreign table */
5267+
staticvoid
5268+
postgresForeignNextCopyFrom(EState*estate,ResultRelInfo*rinfo,
5269+
CopyStatecstate)
5270+
{
5271+
PGconn*conn= (PGconn*)rinfo->ri_FdwState;
5272+
5273+
Assert(!cstate->binary);
5274+
/* TODO: distinuish failure and nonblocking-send EAGAIN */
5275+
if (PQputline(conn,cstate->line_buf.data)||PQputnbytes(conn,"\n",1))
5276+
{
5277+
pgfdw_report_error(ERROR,NULL,conn, false,cstate->line_buf.data);
5278+
}
5279+
}
5280+
5281+
/* Finish COPY FROM */
5282+
staticvoid
5283+
postgresEndForeignCopyFrom(EState*estate,ResultRelInfo*rinfo)
5284+
{
5285+
PGconn*conn= (PGconn*)rinfo->ri_FdwState;
5286+
PGresult*res;
5287+
5288+
/* TODO: PQgetResult? */
5289+
if (PQendcopy(conn))
5290+
{
5291+
pgfdw_report_error(ERROR,NULL,conn, false,"end postgres_fdw copy from");
5292+
}
5293+
while ((res=PQgetResult(conn))!=NULL)
5294+
{
5295+
/* TODO: get error? */
5296+
PQclear(res);
5297+
}
5298+
5299+
ReleaseConnection(conn);
5300+
}

‎contrib/postgres_fdw/postgres_fdw.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#definePOSTGRES_FDW_H
1515

1616
#include"foreign/foreign.h"
17+
#include"commands/copy.h"
1718
#include"lib/stringinfo.h"
1819
#include"nodes/relation.h"
1920
#include"utils/relcache.h"
@@ -177,6 +178,7 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
177178
List*remote_conds,List*pathkeys,boolis_subquery,
178179
List**retrieved_attrs,List**params_list);
179180
externconstchar*get_jointype_name(JoinTypejointype);
181+
externvoiddeparseCopyFromSql(StringInfobuf,Relationrel,CopyStatecstate);
180182

181183
/* in shippable.c */
182184
externboolis_builtin(OidobjectId);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp