81
81
typedef struct
82
82
{
83
83
MtmMessageCode code ;/* Message code: MSG_READY, MSG_PREPARE, MSG_COMMIT, MSG_ABORT */
84
- int node ;/* Sender node ID */
84
+ int node ;/* Sender node ID */
85
85
TransactionId dxid ;/* Transaction ID at destination node */
86
86
TransactionId sxid ;/* Transaction ID at sender node */
87
87
csn_t csn ;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
88
88
nodemask_t disabledNodeMask ;/* Bitmask of disabled nodes at the sender of message */
89
89
csn_t oldestSnapshot ;/* Oldest snapshot used by active transactions at this node */
90
+ uint64 seqno ;/* Message sequence number (used to eliminate duplicated messages) */
90
91
}MtmArbiterMessage ;
91
92
92
93
typedef struct
@@ -112,6 +113,7 @@ static int busy_socket;
112
113
static void MtmTransSender (Datum arg );
113
114
static void MtmTransReceiver (Datum arg );
114
115
static void MtmSendHeartbeat (void );
116
+ static bool MtmSendToNode (int node ,void const * buf ,int size );
115
117
116
118
117
119
static char const * const messageText []=
@@ -248,6 +250,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
248
250
if (rc == 1 ) {
249
251
int n = send (sd ,src ,size ,0 );
250
252
if (n < 0 ) {
253
+ Assert (errno != EINTR );/* should not happen in non-blocking call */
251
254
busy_socket = -1 ;
252
255
return false;
253
256
}
@@ -266,6 +269,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
266
269
{
267
270
int rc = recv (sd ,buf ,buf_size ,0 );
268
271
if (rc <=0 ) {
272
+ Assert (errno != EINTR );/* should not happen in non-blocking call */
269
273
return -1 ;
270
274
}
271
275
return rc ;
@@ -346,9 +350,8 @@ static void MtmSendHeartbeat()
346
350
{
347
351
if (sockets [i ] >=0 && sockets [i ]!= busy_socket && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask ,i ))
348
352
{
349
- size_t rc = send (sockets [i ],& msg ,sizeof (msg ),0 );
350
- if ((size_t )rc != sizeof (msg )) {
351
- elog (LOG ,"Failed to send heartbeat to node %d: %d" ,i + 1 ,errno );
353
+ if (!MtmSendToNode (i ,& msg ,sizeof (msg ))) {
354
+ elog (LOG ,"Arbiter failed to send heartbeat to node %d" ,i + 1 );
352
355
}
353
356
}
354
357
}
@@ -629,6 +632,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
629
632
MTM_LOG3 ("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d" ,
630
633
messageText [ts -> cmd ],ts -> csn ,node + 1 ,MtmNodeId ,ts -> gtid .xid ,ts -> xid );
631
634
Assert (ts -> cmd != MSG_INVALID );
635
+ buf -> data [buf -> used ].seqno = ++ Mtm -> nodes [node ].sendSeqNo ;
632
636
buf -> data [buf -> used ].code = ts -> cmd ;
633
637
buf -> data [buf -> used ].sxid = ts -> xid ;
634
638
buf -> data [buf -> used ].csn = ts -> csn ;
@@ -845,10 +849,17 @@ static void MtmTransReceiver(Datum arg)
845
849
elog (WARNING ,"Ignore message from dead node %d\n" ,msg -> node );
846
850
continue ;
847
851
}
852
+ if (msg -> seqno <=Mtm -> nodes [msg -> node - 1 ].recvSeqNo ) {
853
+ elog (WARNING ,"Ignore duplicated message %ld from node %d" ,msg -> seqno ,msg -> node );
854
+ continue ;
855
+ }
856
+ Mtm -> nodes [msg -> node - 1 ].recvSeqNo = msg -> seqno ;
848
857
849
858
ts = (MtmTransState * )hash_search (MtmXid2State ,& msg -> dxid ,HASH_FIND ,NULL );
850
- Assert (ts != NULL );
851
-
859
+ if (ts == NULL ) {
860
+ elog (WARNING ,"Ignore response for unexisted transaction %d from node %d" ,msg -> dxid ,msg -> node );
861
+ continue ;
862
+ }
852
863
if (BIT_CHECK (msg -> disabledNodeMask ,MtmNodeId - 1 )&& Mtm -> status != MTM_RECOVERY ) {
853
864
elog (PANIC ,"Node %d thinks that I was dead: perform hara-kiri not to be a zombie" ,msg -> node );
854
865
}