Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6fa240f

Browse files
committed
Integrate FDW with XTM
1 parentdee9a9a commit6fa240f

File tree

4 files changed

+87
-37
lines changed

4 files changed

+87
-37
lines changed

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ TransactionId
327327
DtmGetNewTransactionId(boolisSubXact)
328328
{
329329
TransactionIdxid;
330+
331+
XTM_INFO("%d: GetNewTransactionId\n",getpid());
330332

331333
/*
332334
* Workers synchronize transaction state at the beginning of each parallel
@@ -580,9 +582,9 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
580582

581583
staticvoidDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn)
582584
{
583-
XTM_TRACE("XTM: DtmSetTransactionStatus %u = %u\n",xid,status);
585+
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
584586
if (!RecoveryInProgress()) {
585-
if (TransactionIdIsValid(DtmNextXid)) {
587+
if (!DtmIsGlobalTransaction&&TransactionIdIsValid(DtmNextXid)) {
586588
/* Already should be IN_PROGRESS */
587589
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
588590
CurrentTransactionSnapshot=NULL;
@@ -638,6 +640,8 @@ static void DtmInitialize()
638640
dtm->minXid=InvalidTransactionId;
639641
dtm->activeSnapshot.xip= (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount()*sizeof(TransactionId));
640642
dtm->activeSnapshot.subxip= (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount()*sizeof(TransactionId));
643+
644+
RegisterXactCallback(DtmXactCallback,NULL);
641645
}
642646
LWLockRelease(AddinShmemInitLock);
643647

@@ -652,7 +656,6 @@ static void DtmInitialize()
652656
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE
653657
);
654658

655-
RegisterXactCallback(DtmXactCallback,NULL);
656659

657660
TM=&DtmTM;
658661
}

‎contrib/postgres_fdw/connection.c‎

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include"utils/hsearch.h"
2121
#include"utils/memutils.h"
2222

23+
#undef DEBUG3
24+
#defineDEBUG3 WARNING
2325

2426
/*
2527
* Connection cache hash table entry
@@ -68,6 +70,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
6870
staticvoidcheck_conn_params(constchar**keywords,constchar**values);
6971
staticvoidconfigure_remote_session(PGconn*conn);
7072
staticvoiddo_sql_command(PGconn*conn,constchar*sql);
73+
staticvoiddo_sql_send_command(PGconn*conn,constchar*sql);
74+
staticvoiddo_sql_wait_command(PGconn*conn,constchar*sql);
7175
staticvoidbegin_remote_xact(ConnCacheEntry*entry);
7276
staticvoidpgfdw_xact_callback(XactEventevent,void*arg);
7377
staticvoidpgfdw_subxact_callback(SubXactEventevent,
@@ -358,6 +362,27 @@ do_sql_command(PGconn *conn, const char *sql)
358362
PQclear(res);
359363
}
360364

365+
staticvoid
366+
do_sql_send_command(PGconn*conn,constchar*sql)
367+
{
368+
if (PQsendQuery(conn,sql)!=PGRES_COMMAND_OK) {
369+
PGresult*res=PQgetResult(conn);
370+
pgfdw_report_error(ERROR,res,conn, true,sql);
371+
PQclear(res);
372+
}
373+
}
374+
375+
staticvoid
376+
do_sql_wait_command(PGconn*conn,constchar*sql)
377+
{
378+
PGresult*res;
379+
while ((res=PQgetResult(conn))!=NULL) {
380+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
381+
pgfdw_report_error(ERROR,res,conn, true,sql);
382+
PQclear(res);
383+
}
384+
}
385+
361386
/*
362387
* Start remote transaction or subtransaction, if needed.
363388
*
@@ -541,16 +566,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
541566
/* If it has an open remote transaction, try to close it */
542567
if (entry->xact_depth>0)
543568
{
544-
elog(DEBUG3,"closing remote transaction on connection %p",
545-
entry->conn);
569+
elog(DEBUG3,"closing remote transaction on connection %p event %d",
570+
entry->conn,event);
546571

547572
switch (event)
548573
{
549574
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
550575
caseXACT_EVENT_PRE_COMMIT:
551576
/* Commit all remote transactions during pre-commit */
552-
do_sql_command(entry->conn,"COMMIT TRANSACTION");
577+
do_sql_send_command(entry->conn,"COMMIT TRANSACTION");
578+
continue;
579+
caseXACT_EVENT_PRE_PREPARE:
553580

581+
/*
582+
* We disallow remote transactions that modified anything,
583+
* since it's not very reasonable to hold them open until
584+
* the prepared transaction is committed. For the moment,
585+
* throw error unconditionally; later we might allow
586+
* read-only cases. Note that the error will cause us to
587+
* come right back here with event == XACT_EVENT_ABORT, so
588+
* we'll clean up the connection state at that point.
589+
*/
590+
ereport(ERROR,
591+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
592+
errmsg("cannot prepare a transaction that modified remote tables")));
593+
break;
594+
caseXACT_EVENT_PARALLEL_COMMIT:
595+
caseXACT_EVENT_COMMIT:
596+
caseXACT_EVENT_PREPARE:
597+
do_sql_wait_command(entry->conn,"COMMIT TRANSACTION");
554598
/*
555599
* If there were any errors in subtransactions, and we
556600
* made prepared statements, do a DEALLOCATE ALL to make
@@ -574,27 +618,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
574618
entry->have_prep_stmt= false;
575619
entry->have_error= false;
576620
break;
577-
caseXACT_EVENT_PRE_PREPARE:
578-
579-
/*
580-
* We disallow remote transactions that modified anything,
581-
* since it's not very reasonable to hold them open until
582-
* the prepared transaction is committed. For the moment,
583-
* throw error unconditionally; later we might allow
584-
* read-only cases. Note that the error will cause us to
585-
* come right back here with event == XACT_EVENT_ABORT, so
586-
* we'll clean up the connection state at that point.
587-
*/
588-
ereport(ERROR,
589-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
590-
errmsg("cannot prepare a transaction that modified remote tables")));
591-
break;
592-
caseXACT_EVENT_PARALLEL_COMMIT:
593-
caseXACT_EVENT_COMMIT:
594-
caseXACT_EVENT_PREPARE:
595-
/* Pre-commit should have closed the open transaction */
596-
elog(ERROR,"missed cleaning up connection during pre-commit");
597-
break;
598621
caseXACT_EVENT_PARALLEL_ABORT:
599622
caseXACT_EVENT_ABORT:
600623
/* Assume we might have lost track of prepared statements */
@@ -631,21 +654,23 @@ pgfdw_xact_callback(XactEvent event, void *arg)
631654
if (PQstatus(entry->conn)!=CONNECTION_OK||
632655
PQtransactionStatus(entry->conn)!=PQTRANS_IDLE)
633656
{
634-
elog(DEBUG3,"discarding connection %p",entry->conn);
657+
elog(DEBUG3,"discarding connection %p, conn status=%d, trans status=%d",entry->conn,PQstatus(entry->conn),PQtransactionStatus(entry->conn));
635658
PQfinish(entry->conn);
636659
entry->conn=NULL;
637660
}
638661
}
639662

640-
/*
641-
* Regardless of the event type, we can now mark ourselves as out of the
642-
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
643-
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
644-
*/
645-
xact_got_connection= false;
646-
647-
/* Also reset cursor numbering for next transaction */
648-
cursor_number=0;
663+
if (event!=XACT_EVENT_PARALLEL_PRE_COMMIT&&event!=XACT_EVENT_PRE_COMMIT) {
664+
/*
665+
* Regardless of the event type, we can now mark ourselves as out of the
666+
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
667+
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
668+
*/
669+
xact_got_connection= false;
670+
671+
/* Also reset cursor numbering for next transaction */
672+
cursor_number=0;
673+
}
649674
}
650675

651676
/*

‎contrib/postgres_fdw/postgres_fdw--1.0.sql‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
1616
CREATE FOREIGN DATA WRAPPER postgres_fdw
1717
HANDLER postgres_fdw_handler
1818
VALIDATOR postgres_fdw_validator;
19+
20+
CREATEFUNCTIONpostgres_fdw_exec(relidoid, sql cstring)
21+
RETURNS void
22+
AS'MODULE_PATHNAME'
23+
LANGUAGE C STRICT;

‎contrib/postgres_fdw/postgres_fdw.c‎

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ typedef struct
230230
* SQL functions
231231
*/
232232
PG_FUNCTION_INFO_V1(postgres_fdw_handler);
233+
PG_FUNCTION_INFO_V1(postgres_fdw_exec);
233234

234235
/*
235236
* FDW callback routines
@@ -2994,3 +2995,19 @@ conversion_error_callback(void *arg)
29942995
NameStr(tupdesc->attrs[errpos->cur_attno-1]->attname),
29952996
RelationGetRelationName(errpos->rel));
29962997
}
2998+
2999+
Datum
3000+
postgres_fdw_exec(PG_FUNCTION_ARGS)
3001+
{
3002+
Oidrelid=PG_GETARG_OID(0);
3003+
charconst*sql=PG_GETARG_CSTRING(1);
3004+
Oiduserid=GetUserId();
3005+
ForeignTable*table=GetForeignTable(relid);
3006+
ForeignServer*server=GetForeignServer(table->serverid);
3007+
UserMapping*user=GetUserMapping(userid,server->serverid);
3008+
PGconn*conn=GetConnection(server,user, false);
3009+
PGresult*res=PQexec(conn,sql);
3010+
PQclear(res);
3011+
ReleaseConnection(conn);
3012+
PG_RETURN_VOID();
3013+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp