@@ -87,7 +87,6 @@ typedef struct
87
87
csn_t csn ;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
88
88
nodemask_t disabledNodeMask ;/* Bitmask of disabled nodes at the sender of message */
89
89
csn_t oldestSnapshot ;/* Oldest snapshot used by active transactions at this node */
90
- uint64 seqno ;/* Message sequence number (used to eliminate duplicated messages) */
91
90
}MtmArbiterMessage ;
92
91
93
92
typedef struct
@@ -446,7 +445,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
446
445
req .hdr .sxid = ShmemVariableCache -> nextXid ;
447
446
req .hdr .csn = MtmGetCurrentTime ();
448
447
req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
449
- req .hdr .seqno = Mtm -> nodes [node ].recvSeqNo ;
450
448
strcpy (req .connStr ,Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
451
449
if (!MtmWriteSocket (sd ,& req ,sizeof req )) {
452
450
elog (WARNING ,"Arbiter failed to send handshake message to %s:%d: %d" ,host ,port ,errno );
@@ -465,9 +463,6 @@ static int MtmConnectSocket(int node, int port, int timeout)
465
463
}
466
464
467
465
MtmLock (LW_EXCLUSIVE );
468
- if (Mtm -> nodes [resp .node - 1 ].sendSeqNo < resp .seqno ) {
469
- Mtm -> nodes [resp .node - 1 ].sendSeqNo = resp .seqno ;
470
- }
471
466
472
467
/* Some node considered that I am dead, so switch to recovery mode */
473
468
if (BIT_CHECK (resp .disabledNodeMask ,MtmNodeId - 1 )) {
@@ -582,10 +577,6 @@ static void MtmAcceptOneConnection()
582
577
resp .sxid = ShmemVariableCache -> nextXid ;
583
578
resp .csn = MtmGetCurrentTime ();
584
579
resp .node = MtmNodeId ;
585
- resp .seqno = Mtm -> nodes [req .hdr .node - 1 ].recvSeqNo ;
586
- if (Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo < req .hdr .seqno ) {
587
- Mtm -> nodes [req .hdr .node - 1 ].sendSeqNo = req .hdr .seqno ;
588
- }
589
580
MtmUpdateNodeConnectionInfo (& Mtm -> nodes [req .hdr .node - 1 ].con ,req .connStr );
590
581
if (!MtmWriteSocket (fd ,& resp ,sizeof resp )) {
591
582
elog (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,resp .node );
@@ -651,7 +642,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
651
642
MTM_LOG3 ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d" ,
652
643
messageText [ts -> cmd ],ts -> csn ,node + 1 ,MtmNodeId ,ts -> gtid .xid ,ts -> xid );
653
644
Assert (ts -> cmd != MSG_INVALID );
654
- buf -> data [buf -> used ].seqno = ++ Mtm -> nodes [node ].sendSeqNo ;
655
645
buf -> data [buf -> used ].code = ts -> cmd ;
656
646
buf -> data [buf -> used ].sxid = ts -> xid ;
657
647
buf -> data [buf -> used ].csn = ts -> csn ;
@@ -868,12 +858,6 @@ static void MtmTransReceiver(Datum arg)
868
858
elog (WARNING ,"Ignore message from dead node %d\n" ,msg -> node );
869
859
continue ;
870
860
}
871
- if (msg -> seqno <=Mtm -> nodes [msg -> node - 1 ].recvSeqNo ) {
872
- elog (WARNING ,"Ignore duplicated message %ld (<=%ld) from node %d" ,msg -> seqno ,Mtm -> nodes [msg -> node - 1 ].recvSeqNo ,msg -> node );
873
- continue ;
874
- }
875
- Mtm -> nodes [msg -> node - 1 ].recvSeqNo = msg -> seqno ;
876
-
877
861
ts = (MtmTransState * )hash_search (MtmXid2State ,& msg -> dxid ,HASH_FIND ,NULL );
878
862
if (ts == NULL ) {
879
863
elog (WARNING ,"Ignore response for unexisted transaction %d from node %d" ,msg -> dxid ,msg -> node );