Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit56d099f

Browse files
committed
Hack for 2PC for shardman loader.
1 parent305e4bd commit56d099f

File tree

1 file changed

+111
-16
lines changed

1 file changed

+111
-16
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 111 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
#include"access/global_snapshot.h"
1818
#include"access/htup_details.h"
19-
#include"catalog/pg_user_mapping.h"
20-
#include"access/xact.h"
2119
#include"access/transam.h"
20+
#include"access/twophase.h"
21+
#include"access/xact.h"
2222
#include"access/xlog.h"/* GetSystemIdentifier() */
23+
#include"catalog/pg_user_mapping.h"
2324
#include"libpq-int.h"
2425
#include"mb/pg_wchar.h"
2526
#include"miscadmin.h"
@@ -81,7 +82,7 @@ static HTAB *ConnectionHash = NULL;
8182
*/
8283
typedefstructFdwTransactionState
8384
{
84-
char*gid;
85+
chargid[GIDSIZE];
8586
intnparticipants;
8687
GlobalCSNglobal_csn;
8788
booltwo_phase_commit;
@@ -839,7 +840,84 @@ pgfdw_xact_callback(XactEvent event, void *arg)
839840
if (!xact_got_connection)
840841
return;
841842

842-
/* Handle possible two-phase commit */
843+
/*
844+
* Hack for shardman loader: it allows to do 2PC on user-issued
845+
* prepare. In this case we won't be able to commit xacts because we we
846+
* don't record participants info anywhere; this must be done by loader or
847+
* human behind it.
848+
*/
849+
if (event==XACT_EVENT_PRE_PREPARE&&
850+
UseGlobalSnapshots&&
851+
strncmp("pgfdw:",GetPrepareGid(),strlen("pgfdw:"))==0&&
852+
strstr(GetPrepareGid(),"shmnloader")!=0)
853+
{
854+
/*
855+
* Remember gid. We will PREPARE on other nodes and finish global
856+
* snaps on XACT_EVENT_POST_PREPARE.
857+
*/
858+
strncpy(fdwTransState->gid,GetPrepareGid(),GIDSIZE);
859+
/*
860+
* xact_depth and fdwTransState will be cleaned up on
861+
* XACT_EVENT_POST_PREPARE.
862+
*/
863+
elog(WARNING,"pre prepare gid %s",fdwTransState->gid);
864+
return;
865+
}
866+
if (event==XACT_EVENT_PREPARE&&fdwTransState->gid[0]!='\0')
867+
return;/* prevent cleanup */
868+
if (event==XACT_EVENT_POST_PREPARE)
869+
{
870+
GlobalCSNmax_csn=InProgressGlobalCSN;
871+
GlobalCSNmy_csn=InProgressGlobalCSN;
872+
boolres;
873+
char*sql;
874+
elog(WARNING,"fdw post prepare");
875+
876+
if (fdwTransState->gid[0]=='\0')
877+
{
878+
/*
879+
* Nothing to do here; since this cb is not present in vanilla,
880+
* exit to avoid harming state machine
881+
*/
882+
return;
883+
}
884+
sql=psprintf("PREPARE TRANSACTION '%s'",fdwTransState->gid);
885+
res=BroadcastCmd(sql);
886+
if (!res)
887+
gotoerror;
888+
889+
/* Broadcast pg_global_snapshot_prepare() */
890+
my_csn=GlobalSnapshotPrepareTwophase(fdwTransState->gid);
891+
892+
sql=psprintf("SELECT pg_global_snapshot_prepare('%s')",
893+
fdwTransState->gid);
894+
res=BroadcastStmt(sql,PGRES_TUPLES_OK,MaxCsnCB,&max_csn);
895+
if (!res)
896+
gotoerror;
897+
898+
/* select maximal global csn */
899+
if (my_csn>max_csn)
900+
max_csn=my_csn;
901+
902+
/* Broadcast pg_global_snapshot_assign() */
903+
GlobalSnapshotAssignCsnTwoPhase(fdwTransState->gid,max_csn);
904+
sql=psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
905+
fdwTransState->gid,max_csn);
906+
res=BroadcastFunc(sql);
907+
908+
error:
909+
elog(WARNING,"post prepare gid %s, res %d",fdwTransState->gid,res);
910+
if (!res)
911+
{
912+
sql=psprintf("ABORT PREPARED '%s'",fdwTransState->gid);
913+
BroadcastCmd(sql);
914+
elog(ERROR,"failed to PREPARE transaction on remote node, ABORT PREPARED this xact");
915+
}
916+
}
917+
918+
/*
919+
* Handle possible two-phase commit.
920+
*/
843921
if (event==XACT_EVENT_PARALLEL_PRE_COMMIT||event==XACT_EVENT_PRE_COMMIT)
844922
{
845923
boolinclude_local_tx= false;
@@ -862,29 +940,31 @@ pgfdw_xact_callback(XactEvent event, void *arg)
862940
boolres;
863941
char*sql;
864942

865-
fdwTransState->gid=psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
866-
(long long)GetCurrentTimestamp(),
867-
(long long)GetSystemIdentifier(),
868-
MyProcPid,
869-
GetCurrentTransactionIdIfAny(),
870-
++two_phase_xact_count,
871-
fdwTransState->nparticipants);
943+
snprintf(fdwTransState->gid,
944+
GIDSIZE,
945+
"pgfdw:%lld:%llu:%d:%u:%d:%d",
946+
(long long)GetCurrentTimestamp(),
947+
(long long)GetSystemIdentifier(),
948+
MyProcPid,
949+
GetCurrentTransactionIdIfAny(),
950+
++two_phase_xact_count,
951+
fdwTransState->nparticipants);
872952

873953
/* Broadcast PREPARE */
874954
sql=psprintf("PREPARE TRANSACTION '%s'",fdwTransState->gid);
875955
res=BroadcastCmd(sql);
876956
if (!res)
877-
gotoerror;
957+
gotoerror_user2pc;
878958

879959
/* Broadcast pg_global_snapshot_prepare() */
880960
if (include_local_tx)
881961
my_csn=GlobalSnapshotPrepareCurrent();
882962

883963
sql=psprintf("SELECT pg_global_snapshot_prepare('%s')",
884-
fdwTransState->gid);
964+
fdwTransState->gid);
885965
res=BroadcastStmt(sql,PGRES_TUPLES_OK,MaxCsnCB,&max_csn);
886966
if (!res)
887-
gotoerror;
967+
gotoerror_user2pc;
888968

889969
/* select maximal global csn */
890970
if (include_local_tx&&my_csn>max_csn)
@@ -894,10 +974,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
894974
if (include_local_tx)
895975
GlobalSnapshotAssignCsnCurrent(max_csn);
896976
sql=psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
897-
fdwTransState->gid,max_csn);
977+
fdwTransState->gid,max_csn);
898978
res=BroadcastFunc(sql);
899979

900-
error:
980+
error_user2pc:
901981
if (!res)
902982
{
903983
sql=psprintf("ABORT PREPARED '%s'",fdwTransState->gid);
@@ -959,6 +1039,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9591039
break;
9601040
caseXACT_EVENT_PRE_PREPARE:
9611041

1042+
if (fdwTransState->gid[0]!='\0')
1043+
/* See comments above */
1044+
break;
1045+
9621046
/*
9631047
* We disallow remote transactions that modified anything,
9641048
* since it's not very reasonable to hold them open until
@@ -980,6 +1064,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9801064
elog(ERROR,"missed cleaning up connection during pre-commit");
9811065
break;
9821066
caseXACT_EVENT_PREPARE:
1067+
if (fdwTransState->gid[0]!='\0')
1068+
break;
1069+
9831070
/* Pre-commit should have closed the open transaction */
9841071
elog(ERROR,"missed cleaning up connection during pre-commit");
9851072
break;
@@ -1046,6 +1133,14 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10461133
/* Disarm changing_xact_state if it all worked. */
10471134
entry->changing_xact_state=abort_cleanup_failure;
10481135
break;
1136+
caseXACT_EVENT_POST_PREPARE:
1137+
/*
1138+
* New event can break our state machine, so let's list
1139+
* them here explicitely and force compiler warning in
1140+
* case of unhandled event.
1141+
*/
1142+
break;
1143+
10491144
}
10501145
}
10511146

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp