Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb99dd9c

Browse files
committed
port fdw patches
1 parent83a51cb commitb99dd9c

File tree

4 files changed

+284
-41
lines changed

4 files changed

+284
-41
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 230 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include"access/htup_details.h"
1818
#include"catalog/pg_user_mapping.h"
1919
#include"access/xact.h"
20+
#include"access/xtm.h"
21+
#include"access/transam.h"
2022
#include"mb/pg_wchar.h"
2123
#include"miscadmin.h"
2224
#include"pgstat.h"
@@ -72,12 +74,18 @@ static unsigned int prep_stmt_number = 0;
7274
/* tracks whether any work is needed in callback functions */
7375
staticboolxact_got_connection= false;
7476

77+
typedeflong longcsn_t;
78+
staticcsn_tcurrentGlobalTransactionId=0;
79+
staticintcurrentLocalTransactionId=0;
80+
staticPGconn*currentConnection=NULL;
81+
7582
/* prototypes of private functions */
7683
staticPGconn*connect_pg_server(ForeignServer*server,UserMapping*user);
7784
staticvoiddisconnect_pg_server(ConnCacheEntry*entry);
7885
staticvoidcheck_conn_params(constchar**keywords,constchar**values,UserMapping*user);
7986
staticvoidconfigure_remote_session(PGconn*conn);
8087
staticvoiddo_sql_command(PGconn*conn,constchar*sql);
88+
staticvoiddo_sql_send_command(PGconn*conn,constchar*sql);
8189
staticvoidbegin_remote_xact(ConnCacheEntry*entry);
8290
staticvoidpgfdw_xact_callback(XactEventevent,void*arg);
8391
staticvoidpgfdw_subxact_callback(SubXactEventevent,
@@ -403,6 +411,19 @@ do_sql_command(PGconn *conn, const char *sql)
403411
PQclear(res);
404412
}
405413

414+
staticvoid
415+
do_sql_send_command(PGconn*conn,constchar*sql)
416+
{
417+
if (PQsendQuery(conn,sql)!=PGRES_COMMAND_OK)
418+
{
419+
PGresult*res=PQgetResult(conn);
420+
421+
elog(WARNING,"Failed to send command %s",sql);
422+
pgfdw_report_error(ERROR,res,conn, true,sql);
423+
PQclear(res);
424+
}
425+
}
426+
406427
/*
407428
* Start remote transaction or subtransaction, if needed.
408429
*
@@ -417,15 +438,26 @@ static void
417438
begin_remote_xact(ConnCacheEntry*entry)
418439
{
419440
intcurlevel=GetCurrentTransactionNestLevel();
441+
PGresult*res;
442+
420443

421444
/* Start main transaction if we haven't yet */
422445
if (entry->xact_depth <=0)
423446
{
447+
TransactionIdgxid=GetTransactionManager()->GetGlobalTransactionId();
424448
constchar*sql;
425449

426450
elog(DEBUG3,"starting remote transaction on connection %p",
427451
entry->conn);
428452

453+
if (TransactionIdIsValid(gxid))
454+
{
455+
charstmt[64];
456+
snprintf(stmt,sizeof(stmt),"select public.dtm_join_transaction(%d)",gxid);
457+
res=PQexec(entry->conn,stmt);
458+
PQclear(res);
459+
}
460+
429461
if (IsolationIsSerializable())
430462
sql="START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
431463
else
@@ -434,6 +466,41 @@ begin_remote_xact(ConnCacheEntry *entry)
434466
do_sql_command(entry->conn,sql);
435467
entry->xact_depth=1;
436468
entry->changing_xact_state= false;
469+
470+
if (UseTsDtmTransactions)
471+
{
472+
if (currentConnection==NULL)
473+
{
474+
currentConnection=entry->conn;
475+
}
476+
elseif (entry->conn!=currentConnection)
477+
{
478+
if (!currentGlobalTransactionId)
479+
{
480+
char*resp;
481+
res=PQexec(currentConnection,psprintf("SELECT public.dtm_extend('%d.%d')",
482+
MyProcPid,++currentLocalTransactionId));
483+
484+
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
485+
{
486+
pgfdw_report_error(ERROR,res,currentConnection, true,sql);
487+
}
488+
resp=PQgetvalue(res,0,0);
489+
if (resp==NULL|| (*resp)=='\0'||sscanf(resp,"%lld",&currentGlobalTransactionId)!=1)
490+
{
491+
pgfdw_report_error(ERROR,res,currentConnection, true,sql);
492+
}
493+
PQclear(res);
494+
}
495+
res=PQexec(entry->conn,psprintf("SELECT public.dtm_access(%llu, '%d.%d')",currentGlobalTransactionId,MyProcPid,currentLocalTransactionId));
496+
497+
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
498+
{
499+
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
500+
}
501+
PQclear(res);
502+
}
503+
}
437504
}
438505

439506
/*
@@ -643,6 +710,78 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
643710
PQclear(res);
644711
}
645712

713+
typedefbool (*DtmCommandResultHandler) (PGresult*result,void*arg);
714+
715+
staticbool
716+
RunDtmStatement(charconst*sql,unsignedexpectedStatus,DtmCommandResultHandlerhandler,void*arg)
717+
{
718+
HASH_SEQ_STATUSscan;
719+
ConnCacheEntry*entry;
720+
boolallOk= true;
721+
722+
hash_seq_init(&scan,ConnectionHash);
723+
while ((entry= (ConnCacheEntry*)hash_seq_search(&scan)))
724+
{
725+
if (entry->xact_depth>0)
726+
{
727+
do_sql_send_command(entry->conn,sql);
728+
}
729+
}
730+
731+
hash_seq_init(&scan,ConnectionHash);
732+
while ((entry= (ConnCacheEntry*)hash_seq_search(&scan)))
733+
{
734+
if (entry->xact_depth>0)
735+
{
736+
PGresult*result=PQgetResult(entry->conn);
737+
738+
if (PQresultStatus(result)!=expectedStatus|| (handler&& !handler(result,arg)))
739+
{
740+
elog(WARNING,"Failed command %s: status=%d, expected status=%d",sql,PQresultStatus(result),expectedStatus);
741+
pgfdw_report_error(ERROR,result,entry->conn, true,sql);
742+
allOk= false;
743+
}
744+
PQclear(result);
745+
PQgetResult(entry->conn);/* consume NULL result */
746+
}
747+
}
748+
returnallOk;
749+
}
750+
751+
staticbool
752+
RunDtmCommand(charconst*sql)
753+
{
754+
returnRunDtmStatement(sql,PGRES_COMMAND_OK,NULL,NULL);
755+
}
756+
757+
staticbool
758+
RunDtmFunction(charconst*sql)
759+
{
760+
returnRunDtmStatement(sql,PGRES_TUPLES_OK,NULL,NULL);
761+
}
762+
763+
764+
staticbool
765+
DtmMaxCSN(PGresult*result,void*arg)
766+
{
767+
char*resp=PQgetvalue(result,0,0);
768+
csn_t*maxCSN= (csn_t*)arg;
769+
csn_tcsn=0;
770+
771+
if (resp==NULL|| (*resp)=='\0'||sscanf(resp,"%lld",&csn)!=1)
772+
{
773+
return false;
774+
}
775+
else
776+
{
777+
if (*maxCSN<csn)
778+
{
779+
*maxCSN=csn;
780+
}
781+
return true;
782+
}
783+
}
784+
646785
/*
647786
* pgfdw_xact_callback --- cleanup at main-transaction end.
648787
*/
@@ -652,10 +791,55 @@ pgfdw_xact_callback(XactEvent event, void *arg)
652791
HASH_SEQ_STATUSscan;
653792
ConnCacheEntry*entry;
654793

794+
/* Do nothing for this events */
795+
switch (event)
796+
{
797+
caseXACT_EVENT_START:
798+
caseXACT_EVENT_COMMIT_PREPARED:
799+
caseXACT_EVENT_ABORT_PREPARED:
800+
return;
801+
default:
802+
break;
803+
}
804+
655805
/* Quick exit if no connections were touched in this transaction. */
656806
if (!xact_got_connection)
657807
return;
658808

809+
if (currentGlobalTransactionId!=0)
810+
{
811+
switch (event)
812+
{
813+
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
814+
caseXACT_EVENT_PRE_COMMIT:
815+
{
816+
csn_tmaxCSN=0;
817+
818+
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
819+
MyProcPid,currentLocalTransactionId))||
820+
!RunDtmFunction(psprintf("SELECT public.dtm_begin_prepare('%d.%d')",
821+
MyProcPid,currentLocalTransactionId))||
822+
!RunDtmStatement(psprintf("SELECT public.dtm_prepare('%d.%d',0)",
823+
MyProcPid,currentLocalTransactionId),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN)||
824+
!RunDtmFunction(psprintf("SELECT public.dtm_end_prepare('%d.%d',%lld)",
825+
MyProcPid,currentLocalTransactionId,maxCSN))||
826+
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
827+
MyProcPid,currentLocalTransactionId)))
828+
{
829+
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
830+
MyProcPid,currentLocalTransactionId));
831+
ereport(ERROR,
832+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
833+
errmsg("transaction was aborted at one of the shards")));
834+
break;
835+
}
836+
return;
837+
}
838+
default:
839+
break;
840+
}
841+
}
842+
659843
/*
660844
* Scan all connection cache entries to find open remote transactions, and
661845
* close them.
@@ -689,10 +873,34 @@ pgfdw_xact_callback(XactEvent event, void *arg)
689873
pgfdw_reject_incomplete_xact_state_change(entry);
690874

691875
/* Commit all remote transactions during pre-commit */
692-
entry->changing_xact_state= true;
693-
do_sql_command(entry->conn,"COMMIT TRANSACTION");
694-
entry->changing_xact_state= false;
876+
do_sql_send_command(entry->conn,"COMMIT TRANSACTION");
877+
continue;
878+
879+
caseXACT_EVENT_PRE_PREPARE:
880+
881+
/*
882+
* We disallow remote transactions that modified anything,
883+
* since it's not very reasonable to hold them open until
884+
* the prepared transaction is committed. For the moment,
885+
* throw error unconditionally; later we might allow
886+
* read-only cases. Note that the error will cause us to
887+
* come right back here with event == XACT_EVENT_ABORT, so
888+
* we'll clean up the connection state at that point.
889+
*/
890+
ereport(ERROR,
891+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
892+
errmsg("cannot prepare a transaction that modified remote tables")));
893+
break;
695894

895+
caseXACT_EVENT_PARALLEL_COMMIT:
896+
caseXACT_EVENT_COMMIT:
897+
caseXACT_EVENT_PREPARE:
898+
if (!currentGlobalTransactionId)
899+
{
900+
entry->changing_xact_state= true;
901+
do_sql_command(entry->conn,"COMMIT TRANSACTION");
902+
entry->changing_xact_state= false;
903+
}
696904
/*
697905
* If there were any errors in subtransactions, and we
698906
* made prepared statements, do a DEALLOCATE ALL to make
@@ -716,27 +924,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
716924
entry->have_prep_stmt= false;
717925
entry->have_error= false;
718926
break;
719-
caseXACT_EVENT_PRE_PREPARE:
720927

721-
/*
722-
* We disallow remote transactions that modified anything,
723-
* since it's not very reasonable to hold them open until
724-
* the prepared transaction is committed. For the moment,
725-
* throw error unconditionally; later we might allow
726-
* read-only cases. Note that the error will cause us to
727-
* come right back here with event == XACT_EVENT_ABORT, so
728-
* we'll clean up the connection state at that point.
729-
*/
730-
ereport(ERROR,
731-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
732-
errmsg("cannot prepare a transaction that modified remote tables")));
733-
break;
734-
caseXACT_EVENT_PARALLEL_COMMIT:
735-
caseXACT_EVENT_COMMIT:
736-
caseXACT_EVENT_PREPARE:
737-
/* Pre-commit should have closed the open transaction */
738-
elog(ERROR,"missed cleaning up connection during pre-commit");
739-
break;
740928
caseXACT_EVENT_PARALLEL_ABORT:
741929
caseXACT_EVENT_ABORT:
742930

@@ -800,6 +988,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
800988
/* Disarm changing_xact_state if it all worked. */
801989
entry->changing_xact_state=abort_cleanup_failure;
802990
break;
991+
992+
caseXACT_EVENT_START:
993+
caseXACT_EVENT_COMMIT_PREPARED:
994+
caseXACT_EVENT_ABORT_PREPARED:
995+
break;
803996
}
804997
}
805998

@@ -818,16 +1011,22 @@ pgfdw_xact_callback(XactEvent event, void *arg)
8181011
disconnect_pg_server(entry);
8191012
}
8201013
}
1014+
if (event!=XACT_EVENT_PARALLEL_PRE_COMMIT&&event!=XACT_EVENT_PRE_COMMIT)
1015+
{
1016+
/*
1017+
* Regardless of the event type, we can now mark ourselves as out of
1018+
* the transaction. (Note: if we are here during PRE_COMMIT or
1019+
* PRE_PREPARE, this saves a useless scan of the hashtable during
1020+
* COMMIT or PREPARE.)
1021+
*/
1022+
xact_got_connection= false;
8211023

822-
/*
823-
* Regardless of the event type, we can now mark ourselves as out of the
824-
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
825-
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
826-
*/
827-
xact_got_connection= false;
1024+
/* Also reset cursor numbering for next transaction */
1025+
cursor_number=0;
8281026

829-
/* Also reset cursor numbering for next transaction */
830-
cursor_number=0;
1027+
currentGlobalTransactionId=0;
1028+
currentConnection=NULL;
1029+
}
8311030
}
8321031

8331032
/*

‎contrib/postgres_fdw/postgres_fdw--1.0.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
1616
CREATE FOREIGN DATA WRAPPER postgres_fdw
1717
HANDLER postgres_fdw_handler
1818
VALIDATOR postgres_fdw_validator;
19+
20+
CREATEFUNCTIONpostgres_fdw_exec(relidoid, sql cstring)
21+
RETURNS void
22+
AS'MODULE_PATHNAME'
23+
LANGUAGE C STRICT;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp