76
76
77
77
78
78
#include "multimaster.h"
79
+ #include "state.h"
79
80
80
81
#define MAX_ROUTES 16
81
82
#define INIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189
190
MtmUnregisterSocket (sockets [node ]);
190
191
pg_closesocket (sockets [node ],MtmUseRDMA );
191
192
sockets [node ]= -1 ;
192
- MtmOnNodeDisconnect (node + 1 );
193
193
}
194
194
195
195
static int MtmWaitSocket (int sd ,bool forWrite ,timestamp_t timeoutMsec )
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316
316
}else {
317
317
BIT_CLEAR (Mtm -> currentLockNodeMask ,resp -> node - 1 );
318
318
}
319
- if (
320
- (BIT_CHECK (resp -> disabledNodeMask ,MtmNodeId - 1 )|| Mtm -> status == MTM_IN_MINORITY )
321
- && !BIT_CHECK (Mtm -> disabledNodeMask ,resp -> node - 1 )
322
- && Mtm -> status != MTM_RECOVERY
323
- && Mtm -> status != MTM_RECOVERED
324
- && Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC (MtmNodeDisableDelay )< MtmGetSystemTime ())
325
- {
326
- MTM_ELOG (WARNING ,"Node %d thinks that I'm dead, while I'm %s (message %s)" ,resp -> node ,MtmNodeStatusMnem [Mtm -> status ],MtmMessageKindMnem [resp -> code ]);
327
- BIT_SET (Mtm -> disabledNodeMask ,MtmNodeId - 1 );
328
- Mtm -> nConfigChanges += 1 ;
329
- MtmSwitchClusterMode (MTM_RECOVERY );
330
- }else if (BIT_CHECK (Mtm -> disabledNodeMask ,resp -> node - 1 )&& sockets [resp -> node - 1 ]< 0 ) {
331
- /* We receive heartbeat from disabled node.
319
+
320
+ // if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321
+ // {
322
+ // MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323
+ // }
324
+
325
+ if (BIT_CHECK (Mtm -> disabledNodeMask ,resp -> node - 1 )&&
326
+ sockets [resp -> node - 1 ]< 0 )
327
+ {
328
+ /* We've received heartbeat from disabled node.
332
329
* Looks like it is restarted.
333
330
* Try to reconnect to it.
334
331
*/
335
332
MTM_ELOG (WARNING ,"Receive heartbeat from disabled node %d" ,resp -> node );
336
333
BIT_SET (Mtm -> reconnectMask ,resp -> node - 1 );
337
- }
334
+ }
338
335
}
339
336
340
337
static void MtmScheduleHeartbeat ()
@@ -362,11 +359,17 @@ static void MtmSendHeartbeat()
362
359
for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
363
360
{
364
361
if (i + 1 != MtmNodeId ) {
365
- if (!BIT_CHECK (busy_mask ,i )
366
- && (Mtm -> status != MTM_ONLINE
367
- || sockets [i ] >=0
368
- || !BIT_CHECK (Mtm -> disabledNodeMask ,i )
369
- || BIT_CHECK (Mtm -> reconnectMask ,i )))
362
+ if (!BIT_CHECK (busy_mask ,i ))
363
+ /*
364
+ * Old behaviour here can cause subtle bugs, for example
365
+ * it can happened that none of mentioned conditiotions is
366
+ * true when disabled node connects to a major node which
367
+ * is online. So just send it allways. --sk
368
+ */
369
+ // && (Mtm->status != MTM_ONLINE
370
+ // || sockets[i] >= 0
371
+ // || !BIT_CHECK(Mtm->disabledNodeMask, i)
372
+ // || BIT_CHECK(Mtm->reconnectMask, i)))
370
373
{
371
374
if (!MtmSendToNode (i ,& msg ,sizeof (msg ))) {
372
375
MTM_ELOG (LOG ,"Arbiter failed to send heartbeat to node %d" ,i + 1 );
@@ -543,17 +546,9 @@ static void MtmOpenConnections()
543
546
for (i = 0 ;i < nNodes ;i ++ ) {
544
547
if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
545
548
sockets [i ]= MtmConnectSocket (i ,Mtm -> nodes [i ].con .arbiterPort );
546
- if (sockets [i ]< 0 ) {
547
- MtmOnNodeDisconnect (i + 1 );
548
- }
549
549
}
550
550
}
551
- if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) {/* no quorum */
552
- MTM_ELOG (WARNING ,"Node is out of quorum: only %d nodes of %d are accessible" ,Mtm -> nLiveNodes ,Mtm -> nAllNodes );
553
- MtmSwitchClusterMode (MTM_IN_MINORITY );
554
- }else if (Mtm -> status == MTM_INITIALIZATION ) {
555
- MtmSwitchClusterMode (MTM_CONNECTED );
556
- }
551
+ MtmStateProcessEvent (MTM_ARBITER_RECEIVER_START );
557
552
}
558
553
559
554
@@ -586,7 +581,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586
581
}
587
582
sockets [node ]= MtmConnectSocket (node ,Mtm -> nodes [node ].con .arbiterPort );
588
583
if (sockets [node ]< 0 ) {
589
- MtmOnNodeDisconnect (node + 1 );
590
584
result = false;
591
585
break ;
592
586
}
@@ -716,16 +710,18 @@ static void MtmSender(Datum arg)
716
710
{
717
711
int nNodes = MtmMaxNodes ;
718
712
int i ;
713
+ MtmBuffer * txBuffer ;
719
714
720
715
MtmBackgroundWorker = true;
721
716
722
- MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
717
+ txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
723
718
MTM_ELOG (LOG ,"Start arbiter sender %d" ,MyProcPid );
724
719
InitializeTimeouts ();
725
720
726
721
pqsignal (SIGINT ,SetStop );
727
722
pqsignal (SIGQUIT ,SetStop );
728
723
pqsignal (SIGTERM ,SetStop );
724
+ pqsignal (SIGHUP ,PostgresSigHupHandler );
729
725
730
726
/* We're now ready to receive signals */
731
727
BackgroundWorkerUnblockSignals ();
@@ -744,6 +740,12 @@ static void MtmSender(Datum arg)
744
740
PGSemaphoreLock (& Mtm -> sendSemaphore );
745
741
CHECK_FOR_INTERRUPTS ();
746
742
743
+ if (ConfigReloadPending )
744
+ {
745
+ ConfigReloadPending = false;
746
+ ProcessConfigFile (PGC_SIGHUP );
747
+ }
748
+
747
749
MtmCheckHeartbeat ();
748
750
/*
749
751
* Use shared lock to improve locality,
@@ -805,6 +807,7 @@ static void MtmMonitor(Datum arg)
805
807
pqsignal (SIGINT ,SetStop );
806
808
pqsignal (SIGQUIT ,SetStop );
807
809
pqsignal (SIGTERM ,SetStop );
810
+ pqsignal (SIGHUP ,PostgresSigHupHandler );
808
811
809
812
MtmBackgroundWorker = true;
810
813
@@ -819,6 +822,13 @@ static void MtmMonitor(Datum arg)
819
822
if (rc & WL_POSTMASTER_DEATH ) {
820
823
break ;
821
824
}
825
+
826
+ if (ConfigReloadPending )
827
+ {
828
+ ConfigReloadPending = false;
829
+ ProcessConfigFile (PGC_SIGHUP );
830
+ }
831
+
822
832
MtmRefreshClusterStatus ();
823
833
}
824
834
}
@@ -844,6 +854,7 @@ static void MtmReceiver(Datum arg)
844
854
pqsignal (SIGINT ,SetStop );
845
855
pqsignal (SIGQUIT ,SetStop );
846
856
pqsignal (SIGTERM ,SetStop );
857
+ pqsignal (SIGHUP ,PostgresSigHupHandler );
847
858
848
859
MtmBackgroundWorker = true;
849
860
@@ -879,7 +890,14 @@ static void MtmReceiver(Datum arg)
879
890
for (j = 0 ;j < n ;j ++ ) {
880
891
if (events [j ].events & EPOLLIN )
881
892
#else
882
- fd_set events ;
893
+ fd_set events ;
894
+
895
+ if (ConfigReloadPending )
896
+ {
897
+ ConfigReloadPending = false;
898
+ ProcessConfigFile (PGC_SIGHUP );
899
+ }
900
+
883
901
do {
884
902
struct timeval tv ;
885
903
events = inset ;
@@ -969,6 +987,7 @@ static void MtmReceiver(Datum arg)
969
987
msg -> gid ,MtmTxnStatusMnem [msg -> status ],node );
970
988
971
989
replorigin_session_origin = DoNotReplicateId ;
990
+ TXFINISH ("%s ABORT, MSG_POLL_STATUS" ,msg -> gid );
972
991
MtmFinishPreparedTransaction (ts , false);
973
992
replorigin_session_origin = InvalidRepOriginId ;
974
993
}
@@ -982,6 +1001,7 @@ static void MtmReceiver(Datum arg)
982
1001
MTM_ELOG (LOG ,"Commit transaction %s because it is prepared at all live nodes" ,msg -> gid );
983
1002
984
1003
replorigin_session_origin = DoNotReplicateId ;
1004
+ TXFINISH ("%s COMMIT, MSG_POLL_STATUS" ,msg -> gid );
985
1005
MtmFinishPreparedTransaction (ts , true);
986
1006
replorigin_session_origin = InvalidRepOriginId ;
987
1007
}else {
@@ -1006,7 +1026,7 @@ static void MtmReceiver(Datum arg)
1006
1026
default :
1007
1027
break ;
1008
1028
}
1009
- if (BIT_CHECK (msg -> disabledNodeMask ,node - 1 )) {
1029
+ if (BIT_CHECK (msg -> disabledNodeMask ,node - 1 )|| BIT_CHECK ( Mtm -> disabledNodeMask , node - 1 ) ) {
1010
1030
MTM_ELOG (WARNING ,"Ignore message from dead node %d\n" ,node );
1011
1031
continue ;
1012
1032
}
@@ -1057,17 +1077,10 @@ static void MtmReceiver(Datum arg)
1057
1077
if (ts -> isTwoPhase ) {
1058
1078
MtmWakeUpBackend (ts );
1059
1079
}else if (MtmUseDtm ) {
1060
- ts -> votedMask = 0 ;
1061
1080
MTM_TXTRACE (ts ,"MtmTransReceiver send MSG_PRECOMMIT" );
1062
1081
Assert (replorigin_session_origin == InvalidRepOriginId );
1063
- MTM_LOG2 ("SetPreparedTransactionState for %s" ,ts -> gid );
1064
- MtmUnlock ();
1065
- MtmResetTransaction ();
1066
- StartTransactionCommand ();
1067
- SetPreparedTransactionState (ts -> gid ,MULTIMASTER_PRECOMMITTED );
1068
- CommitTransactionCommand ();
1069
- Assert (!MtmTransIsActive ());
1070
- MtmLock (LW_EXCLUSIVE );
1082
+ ts -> isPrepared = false;
1083
+ SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
1071
1084
}else {
1072
1085
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
1073
1086
MtmWakeUpBackend (ts );
@@ -1084,7 +1097,7 @@ static void MtmReceiver(Datum arg)
1084
1097
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1085
1098
MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d" ,ts -> gid , (long64 )ts -> xid ,node );
1086
1099
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1087
- ts -> aborted_by_node = node ;
1100
+ ts -> abortedByNode = node ;
1088
1101
MtmAbortTransaction (ts );
1089
1102
}
1090
1103
if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask )== 0 ) {
@@ -1161,4 +1174,3 @@ static void MtmReceiver(Datum arg)
1161
1174
}
1162
1175
proc_exit (1 );/* force restart of this bgwroker */
1163
1176
}
1164
-