Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit42c2fad

Browse files
committed
Port Kostya's 2pc for postgres_fdw and triggers on fdw tables on branch with isolation-related stuff.
1 parentaf234df commit42c2fad

File tree

13 files changed

+389
-220
lines changed

13 files changed

+389
-220
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 142 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include"access/transam.h"
2222
#include"access/xlog.h"
2323
#include"libpq-int.h"
24+
#include"access/xlog.h"
2425
#include"mb/pg_wchar.h"
2526
#include"miscadmin.h"
2627
#include"pgstat.h"
@@ -30,7 +31,6 @@
3031
#include"utils/memutils.h"
3132
#include"utils/syscache.h"
3233

33-
3434
/*
3535
* Connection cache hash table entry
3636
*
@@ -103,6 +103,55 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
103103
staticboolpgfdw_get_cleanup_result(PGconn*conn,TimestampTzendtime,
104104
PGresult**result);
105105

106+
staticintDistributedTransactionCount;
107+
staticintDistributedTransactionParticipantsCount;
108+
staticchar*DistributedTransactionGid;
109+
110+
/* Parallel send of sql statement to all paritcipants nodes
111+
* and wait status
112+
*/
113+
staticbool
114+
BroadcastStatement(charconst*sql,unsignedexpectedStatus)
115+
{
116+
HASH_SEQ_STATUSscan;
117+
ConnCacheEntry*entry;
118+
boolallOk= true;
119+
120+
hash_seq_init(&scan,ConnectionHash);
121+
while ((entry= (ConnCacheEntry*)hash_seq_search(&scan)))
122+
{
123+
if (entry->xact_depth>0)
124+
{
125+
do_sql_send_command(entry->conn,sql);
126+
}
127+
}
128+
129+
hash_seq_init(&scan,ConnectionHash);
130+
while ((entry= (ConnCacheEntry*)hash_seq_search(&scan)))
131+
{
132+
if (entry->xact_depth>0)
133+
{
134+
PGresult*result=PQgetResult(entry->conn);
135+
136+
if (PQresultStatus(result)!=expectedStatus)
137+
{
138+
elog(WARNING,"Failed command %s: status=%d, expected status=%d",sql,PQresultStatus(result),expectedStatus);
139+
pgfdw_report_error(ERROR,result,entry->conn, true,sql);
140+
allOk= false;
141+
}
142+
PQclear(result);
143+
PQgetResult(entry->conn);/* consume NULL result */
144+
}
145+
}
146+
returnallOk;
147+
}
148+
149+
staticbool
150+
BroadcastCommand(charconst*sql)
151+
{
152+
returnBroadcastStatement(sql,PGRES_COMMAND_OK);
153+
}
154+
106155

107156
/*
108157
* Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -457,31 +506,23 @@ begin_remote_xact(ConnCacheEntry *entry)
457506
/* Start main transaction if we haven't yet */
458507
if (entry->xact_depth <=0)
459508
{
460-
TransactionIdgxid=GetTransactionManager()->GetGlobalTransactionId();
461509
constchar*sql;
462510

463511
elog(DEBUG3,"starting remote transaction on connection %p",
464512
entry->conn);
465513

466-
// XXXX?
467-
//
468-
// if (UseTsDtmTransactions && TransactionIdIsValid(gxid))
469-
// {
470-
// charstmt[64];
471-
// snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
472-
// res = PQexec(entry->conn, stmt);
473-
// PQclear(res);
474-
// }
475-
476514
if (IsolationIsSerializable())
477515
sql="START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
478-
else
516+
elseif (UseRepeatableRead)
479517
sql="START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
518+
else
519+
sql="START TRANSACTION";
480520
entry->changing_xact_state= true;
481521
do_sql_command(entry->conn,sql);
482522
entry->xact_depth=1;
483523
entry->changing_xact_state= false;
484524

525+
485526
if (UseTsDtmTransactions)
486527
{
487528
if (currentConnection==NULL)
@@ -516,6 +557,8 @@ begin_remote_xact(ConnCacheEntry *entry)
516557
PQclear(res);
517558
}
518559
}
560+
561+
DistributedTransactionParticipantsCount+=1;
519562
}
520563

521564
/*
@@ -805,55 +848,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
805848
{
806849
HASH_SEQ_STATUSscan;
807850
ConnCacheEntry*entry;
808-
809-
// /* Do nothing for this events */
810-
// switch (event)
811-
// {
812-
// case XACT_EVENT_START:
813-
// case XACT_EVENT_COMMIT_PREPARED:
814-
// case XACT_EVENT_ABORT_PREPARED:
815-
// return;
816-
// default:
817-
// break;
818-
// }
851+
booltwo_phase_commit;
819852

820853
/* Quick exit if no connections were touched in this transaction. */
821854
if (!xact_got_connection)
822855
return;
823856

824-
if (currentGlobalTransactionId!=0)
825-
{
826-
switch (event)
827-
{
828-
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
829-
caseXACT_EVENT_PRE_COMMIT:
830-
{
831-
csn_tmaxCSN=0;
832-
833-
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
834-
MyProcPid,currentLocalTransactionId))||
835-
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_begin_prepare('%d.%d')",
836-
MyProcPid,currentLocalTransactionId))||
837-
!RunDtmStatement(psprintf("SELECT pg_global_snaphot_prepare('%d.%d',0)",
838-
MyProcPid,currentLocalTransactionId),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN)||
839-
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_end_prepare('%d.%d',%lld)",
840-
MyProcPid,currentLocalTransactionId,maxCSN))||
841-
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
842-
MyProcPid,currentLocalTransactionId)))
843-
{
844-
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
845-
MyProcPid,currentLocalTransactionId));
846-
ereport(ERROR,
847-
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
848-
errmsg("transaction was aborted at one of the shards")));
849-
break;
850-
}
851-
return;
852-
}
853-
default:
854-
break;
855-
}
856-
}
857+
/***********************************************************************************************/
858+
859+
/* Check if we need to perform 2PC commit: number of paritcipants should be greater than 1 */
860+
two_phase_commit=Use2PC
861+
&& (TransactionIdIsValid(GetCurrentTransactionIdIfAny())+DistributedTransactionParticipantsCount)>1;
862+
863+
/***********************************************************************************************/
857864

858865
/*
859866
* Scan all connection cache entries to find open remote transactions, and
@@ -891,10 +898,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
891898
* we can't issue any more commands against it.
892899
*/
893900
pgfdw_reject_incomplete_xact_state_change(entry);
894-
895-
/* Commit all remote transactions during pre-commit */
896-
if (!currentGlobalTransactionId)
901+
if (!two_phase_commit&& !currentGlobalTransactionId)
897902
{
903+
/* Commit all remote transactions during pre-commit */
898904
entry->changing_xact_state= true;
899905
do_sql_command(entry->conn,"COMMIT TRANSACTION");
900906
entry->changing_xact_state= false;
@@ -922,6 +928,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
922928
}
923929
entry->have_prep_stmt= false;
924930
entry->have_error= false;
931+
932+
if (two_phase_commit)
933+
{
934+
/* Do not reset xact_depth and break connection: we still need them for second phase
935+
*/
936+
continue;
937+
}
925938
break;
926939
caseXACT_EVENT_PRE_PREPARE:
927940

@@ -1025,21 +1038,77 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10251038
disconnect_pg_server(entry);
10261039
}
10271040
}
1028-
// if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT)
1029-
// {
1030-
/*
1031-
* Regardless of the event type, we can now mark ourselves as out of the
1032-
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1033-
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1034-
*/
1035-
xact_got_connection= false;
10361041

1037-
/* Also reset cursor numbering for next transaction */
1038-
cursor_number=0;
1042+
/***********************************************************************************************/
1043+
1044+
/*
1045+
* In case of 2PC broadcast PREPARE TRANSACTION statement.
1046+
* We are using BroadcasrCommand instead of sending them in the connection
1047+
* iterator above because we want to process them in parallel
1048+
*/
1049+
1050+
if (currentGlobalTransactionId!=0&&
1051+
(event==XACT_EVENT_PARALLEL_PRE_COMMIT||event==XACT_EVENT_PRE_COMMIT))
1052+
{
1053+
csn_tmaxCSN=0;
1054+
1055+
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
1056+
MyProcPid,currentLocalTransactionId))||
1057+
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_begin_prepare('%d.%d')",
1058+
MyProcPid,currentLocalTransactionId))||
1059+
!RunDtmStatement(psprintf("SELECT pg_global_snaphot_prepare('%d.%d',0)",
1060+
MyProcPid,currentLocalTransactionId),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN)||
1061+
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_end_prepare('%d.%d',%lld)",
1062+
MyProcPid,currentLocalTransactionId,maxCSN))||
1063+
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
1064+
MyProcPid,currentLocalTransactionId)))
1065+
{
1066+
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
1067+
MyProcPid,currentLocalTransactionId));
1068+
ereport(ERROR,
1069+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
1070+
errmsg("transaction was aborted at one of the shards")));
1071+
}
1072+
return;
1073+
}
1074+
1075+
elseif (two_phase_commit&&
1076+
(event==XACT_EVENT_PARALLEL_PRE_COMMIT||event==XACT_EVENT_PRE_COMMIT))
1077+
{
1078+
DistributedTransactionGid=psprintf("%d:%d:%lld:%lld:%d",
1079+
MyProcPid,
1080+
++DistributedTransactionCount,
1081+
(long long)GetSystemIdentifier(),
1082+
(long long)GetCurrentTransactionId(),
1083+
DistributedTransactionParticipantsCount);
1084+
if (!BroadcastCommand(psprintf("PREPARE TRANSACTION '%s'",DistributedTransactionGid))||
1085+
!BroadcastCommand(psprintf("COMMIT PREPARED '%s'",DistributedTransactionGid)))
1086+
{
1087+
BroadcastCommand(psprintf("ROLLBACK PREPARED '%s'",DistributedTransactionGid));
1088+
ereport(ERROR,
1089+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
1090+
errmsg("Transaction %s was aborted at one of participants",DistributedTransactionGid)));
1091+
}
1092+
return;
1093+
}
1094+
1095+
1096+
/***********************************************************************************************/
1097+
1098+
/*
1099+
* Regardless of the event type, we can now mark ourselves as out of the
1100+
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1101+
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1102+
*/
1103+
xact_got_connection= false;
1104+
1105+
/* Also reset cursor numbering for next transaction */
1106+
cursor_number=0;
10391107

1040-
currentGlobalTransactionId=0;
1041-
currentConnection=NULL;
1042-
// }
1108+
DistributedTransactionParticipantsCount=0;
1109+
DistributedTransactionGid=NULL;
1110+
currentGlobalTransactionId=0;
1111+
currentConnection=NULL;
10431112
}
10441113

10451114
/*

‎contrib/postgres_fdw/expected/postgres_fdw.out

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7022,6 +7022,65 @@ update bar set f2 = f2 + 100 returning *;
70227022
7 | 277
70237023
(6 rows)
70247024

7025+
-- Test that UPDATE/DELETE with inherited target works with row-level triggers
7026+
CREATE TRIGGER trig_row_before
7027+
BEFORE UPDATE OR DELETE ON bar2
7028+
FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo');
7029+
CREATE TRIGGER trig_row_after
7030+
AFTER UPDATE OR DELETE ON bar2
7031+
FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo');
7032+
explain (verbose, costs off)
7033+
update bar set f2 = f2 + 100;
7034+
QUERY PLAN
7035+
--------------------------------------------------------------------------------------
7036+
Update on public.bar
7037+
Update on public.bar
7038+
Foreign Update on public.bar2
7039+
Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1 RETURNING f1, f2, f3
7040+
-> Seq Scan on public.bar
7041+
Output: bar.f1, (bar.f2 + 100), bar.ctid
7042+
-> Foreign Scan on public.bar2
7043+
Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, bar2.*
7044+
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
7045+
(9 rows)
7046+
7047+
update bar set f2 = f2 + 100;
7048+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW UPDATE ON bar2
7049+
NOTICE: OLD: (3,333,33),NEW: (3,433,33)
7050+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW UPDATE ON bar2
7051+
NOTICE: OLD: (4,344,44),NEW: (4,444,44)
7052+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW UPDATE ON bar2
7053+
NOTICE: OLD: (7,277,77),NEW: (7,377,77)
7054+
NOTICE: trig_row_after(23, skidoo) AFTER ROW UPDATE ON bar2
7055+
NOTICE: OLD: (3,333,33),NEW: (3,433,33)
7056+
NOTICE: trig_row_after(23, skidoo) AFTER ROW UPDATE ON bar2
7057+
NOTICE: OLD: (4,344,44),NEW: (4,444,44)
7058+
NOTICE: trig_row_after(23, skidoo) AFTER ROW UPDATE ON bar2
7059+
NOTICE: OLD: (7,277,77),NEW: (7,377,77)
7060+
explain (verbose, costs off)
7061+
delete from bar where f2 < 400;
7062+
QUERY PLAN
7063+
---------------------------------------------------------------------------------------------
7064+
Delete on public.bar
7065+
Delete on public.bar
7066+
Foreign Delete on public.bar2
7067+
Remote SQL: DELETE FROM public.loct2 WHERE ctid = $1 RETURNING f1, f2, f3
7068+
-> Seq Scan on public.bar
7069+
Output: bar.ctid
7070+
Filter: (bar.f2 < 400)
7071+
-> Foreign Scan on public.bar2
7072+
Output: bar2.ctid, bar2.*
7073+
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 WHERE ((f2 < 400)) FOR UPDATE
7074+
(10 rows)
7075+
7076+
delete from bar where f2 < 400;
7077+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW DELETE ON bar2
7078+
NOTICE: OLD: (7,377,77)
7079+
NOTICE: trig_row_after(23, skidoo) AFTER ROW DELETE ON bar2
7080+
NOTICE: OLD: (7,377,77)
7081+
-- cleanup
7082+
DROP TRIGGER trig_row_before ON bar2;
7083+
DROP TRIGGER trig_row_after ON bar2;
70257084
drop table foo cascade;
70267085
NOTICE: drop cascades to foreign table foo2
70277086
drop table bar cascade;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp