16
16
17
17
#include "access/global_snapshot.h"
18
18
#include "access/htup_details.h"
19
- #include "catalog/pg_user_mapping.h"
20
- #include "access/xact.h"
21
19
#include "access/transam.h"
20
+ #include "access/twophase.h"
21
+ #include "access/xact.h"
22
22
#include "access/xlog.h" /* GetSystemIdentifier() */
23
+ #include "catalog/pg_user_mapping.h"
23
24
#include "libpq-int.h"
24
25
#include "mb/pg_wchar.h"
25
26
#include "miscadmin.h"
@@ -81,7 +82,7 @@ static HTAB *ConnectionHash = NULL;
81
82
*/
82
83
typedef struct FdwTransactionState
83
84
{
84
- char * gid ;
85
+ char gid [ GIDSIZE ] ;
85
86
int nparticipants ;
86
87
GlobalCSN global_csn ;
87
88
bool two_phase_commit ;
@@ -839,7 +840,84 @@ pgfdw_xact_callback(XactEvent event, void *arg)
839
840
if (!xact_got_connection )
840
841
return ;
841
842
842
- /* Handle possible two-phase commit */
843
+ /*
844
+ * Hack for shardman loader: it allows to do 2PC on user-issued
845
+ * prepare. In this case we won't be able to commit xacts because we we
846
+ * don't record participants info anywhere; this must be done by loader or
847
+ * human behind it.
848
+ */
849
+ if (event == XACT_EVENT_PRE_PREPARE &&
850
+ UseGlobalSnapshots &&
851
+ strncmp ("pgfdw:" ,GetPrepareGid (),strlen ("pgfdw:" ))== 0 &&
852
+ strstr (GetPrepareGid (),"shmnloader" )!= 0 )
853
+ {
854
+ /*
855
+ * Remember gid. We will PREPARE on other nodes and finish global
856
+ * snaps on XACT_EVENT_POST_PREPARE.
857
+ */
858
+ strncpy (fdwTransState -> gid ,GetPrepareGid (),GIDSIZE );
859
+ /*
860
+ * xact_depth and fdwTransState will be cleaned up on
861
+ * XACT_EVENT_POST_PREPARE.
862
+ */
863
+ elog (WARNING ,"pre prepare gid %s" ,fdwTransState -> gid );
864
+ return ;
865
+ }
866
+ if (event == XACT_EVENT_PREPARE && fdwTransState -> gid [0 ]!= '\0' )
867
+ return ;/* prevent cleanup */
868
+ if (event == XACT_EVENT_POST_PREPARE )
869
+ {
870
+ GlobalCSN max_csn = InProgressGlobalCSN ;
871
+ GlobalCSN my_csn = InProgressGlobalCSN ;
872
+ bool res ;
873
+ char * sql ;
874
+ elog (WARNING ,"fdw post prepare" );
875
+
876
+ if (fdwTransState -> gid [0 ]== '\0' )
877
+ {
878
+ /*
879
+ * Nothing to do here; since this cb is not present in vanilla,
880
+ * exit to avoid harming state machine
881
+ */
882
+ return ;
883
+ }
884
+ sql = psprintf ("PREPARE TRANSACTION '%s'" ,fdwTransState -> gid );
885
+ res = BroadcastCmd (sql );
886
+ if (!res )
887
+ gotoerror ;
888
+
889
+ /* Broadcast pg_global_snapshot_prepare() */
890
+ my_csn = GlobalSnapshotPrepareTwophase (fdwTransState -> gid );
891
+
892
+ sql = psprintf ("SELECT pg_global_snapshot_prepare('%s')" ,
893
+ fdwTransState -> gid );
894
+ res = BroadcastStmt (sql ,PGRES_TUPLES_OK ,MaxCsnCB ,& max_csn );
895
+ if (!res )
896
+ gotoerror ;
897
+
898
+ /* select maximal global csn */
899
+ if (my_csn > max_csn )
900
+ max_csn = my_csn ;
901
+
902
+ /* Broadcast pg_global_snapshot_assign() */
903
+ GlobalSnapshotAssignCsnTwoPhase (fdwTransState -> gid ,max_csn );
904
+ sql = psprintf ("SELECT pg_global_snapshot_assign('%s'," UINT64_FORMAT ")" ,
905
+ fdwTransState -> gid ,max_csn );
906
+ res = BroadcastFunc (sql );
907
+
908
+ error :
909
+ elog (WARNING ,"post prepare gid %s, res %d" ,fdwTransState -> gid ,res );
910
+ if (!res )
911
+ {
912
+ sql = psprintf ("ABORT PREPARED '%s'" ,fdwTransState -> gid );
913
+ BroadcastCmd (sql );
914
+ elog (ERROR ,"failed to PREPARE transaction on remote node, ABORT PREPARED this xact" );
915
+ }
916
+ }
917
+
918
+ /*
919
+ * Handle possible two-phase commit.
920
+ */
843
921
if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT )
844
922
{
845
923
bool include_local_tx = false;
@@ -862,29 +940,31 @@ pgfdw_xact_callback(XactEvent event, void *arg)
862
940
bool res ;
863
941
char * sql ;
864
942
865
- fdwTransState -> gid = psprintf ("pgfdw:%lld:%llu:%d:%u:%d:%d" ,
866
- (long long )GetCurrentTimestamp (),
867
- (long long )GetSystemIdentifier (),
868
- MyProcPid ,
869
- GetCurrentTransactionIdIfAny (),
870
- ++ two_phase_xact_count ,
871
- fdwTransState -> nparticipants );
943
+ snprintf (fdwTransState -> gid ,
944
+ GIDSIZE ,
945
+ "pgfdw:%lld:%llu:%d:%u:%d:%d" ,
946
+ (long long )GetCurrentTimestamp (),
947
+ (long long )GetSystemIdentifier (),
948
+ MyProcPid ,
949
+ GetCurrentTransactionIdIfAny (),
950
+ ++ two_phase_xact_count ,
951
+ fdwTransState -> nparticipants );
872
952
873
953
/* Broadcast PREPARE */
874
954
sql = psprintf ("PREPARE TRANSACTION '%s'" ,fdwTransState -> gid );
875
955
res = BroadcastCmd (sql );
876
956
if (!res )
877
- gotoerror ;
957
+ gotoerror_user2pc ;
878
958
879
959
/* Broadcast pg_global_snapshot_prepare() */
880
960
if (include_local_tx )
881
961
my_csn = GlobalSnapshotPrepareCurrent ();
882
962
883
963
sql = psprintf ("SELECT pg_global_snapshot_prepare('%s')" ,
884
- fdwTransState -> gid );
964
+ fdwTransState -> gid );
885
965
res = BroadcastStmt (sql ,PGRES_TUPLES_OK ,MaxCsnCB ,& max_csn );
886
966
if (!res )
887
- gotoerror ;
967
+ gotoerror_user2pc ;
888
968
889
969
/* select maximal global csn */
890
970
if (include_local_tx && my_csn > max_csn )
@@ -894,10 +974,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
894
974
if (include_local_tx )
895
975
GlobalSnapshotAssignCsnCurrent (max_csn );
896
976
sql = psprintf ("SELECT pg_global_snapshot_assign('%s'," UINT64_FORMAT ")" ,
897
- fdwTransState -> gid ,max_csn );
977
+ fdwTransState -> gid ,max_csn );
898
978
res = BroadcastFunc (sql );
899
979
900
- error :
980
+ error_user2pc :
901
981
if (!res )
902
982
{
903
983
sql = psprintf ("ABORT PREPARED '%s'" ,fdwTransState -> gid );
@@ -959,6 +1039,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
959
1039
break ;
960
1040
case XACT_EVENT_PRE_PREPARE :
961
1041
1042
+ if (fdwTransState -> gid [0 ]!= '\0' )
1043
+ /* See comments above */
1044
+ break ;
1045
+
962
1046
/*
963
1047
* We disallow remote transactions that modified anything,
964
1048
* since it's not very reasonable to hold them open until
@@ -980,6 +1064,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
980
1064
elog (ERROR ,"missed cleaning up connection during pre-commit" );
981
1065
break ;
982
1066
case XACT_EVENT_PREPARE :
1067
+ if (fdwTransState -> gid [0 ]!= '\0' )
1068
+ break ;
1069
+
983
1070
/* Pre-commit should have closed the open transaction */
984
1071
elog (ERROR ,"missed cleaning up connection during pre-commit" );
985
1072
break ;
@@ -1046,6 +1133,14 @@ pgfdw_xact_callback(XactEvent event, void *arg)
1046
1133
/* Disarm changing_xact_state if it all worked. */
1047
1134
entry -> changing_xact_state = abort_cleanup_failure ;
1048
1135
break ;
1136
+ case XACT_EVENT_POST_PREPARE :
1137
+ /*
1138
+ * New event can break our state machine, so let's list
1139
+ * them here explicitely and force compiler warning in
1140
+ * case of unhandled event.
1141
+ */
1142
+ break ;
1143
+
1049
1144
}
1050
1145
}
1051
1146