@@ -82,20 +82,25 @@ typedef struct
82
82
int node ;/* Sender node ID */
83
83
TransactionId dxid ;/* Transaction ID at destination node */
84
84
TransactionId sxid ;/* Transaction ID at sender node */
85
- csn_t csn ;/* local CSN in case of sending data from replica to master, global CSN master->replica */
86
- nodemask_t disabledNodeMask ;/* bitmask of disabled nodes at the sender of message */
85
+ csn_t csn ;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
86
+ nodemask_t disabledNodeMask ;/* Bitmask of disabled nodes at the sender of message */
87
+ csn_t oldestSnapshot ;/* Oldest snapshot used by active transactions at this node */
87
88
}MtmArbiterMessage ;
88
89
90
+ typedef struct
91
+ {
92
+ MtmArbiterMessage hdr ;
93
+ char connStr [MULTIMASTER_MAX_CONN_STR_SIZE ];
94
+ }MtmHandshakeMessage ;
95
+
89
96
typedef struct
90
97
{
91
98
int used ;
92
99
MtmArbiterMessage data [BUFFER_SIZE ];
93
100
}MtmBuffer ;
94
101
95
102
static int * sockets ;
96
- static char * * hosts ;
97
103
static int gateway ;
98
- static MtmState * ds ;
99
104
100
105
static void MtmTransSender (Datum arg );
101
106
static void MtmTransReceiver (Datum arg );
@@ -266,39 +271,41 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
266
271
continue ;
267
272
}else {
268
273
int optval = 1 ;
269
- MtmArbiterMessage msg ;
274
+ MtmHandshakeMessage req ;
275
+ MtmArbiterMessage resp ;
270
276
setsockopt (sd ,IPPROTO_TCP ,TCP_NODELAY , (char const * )& optval ,sizeof (optval ));
271
277
setsockopt (sd ,SOL_SOCKET ,SO_KEEPALIVE , (char const * )& optval ,sizeof (optval ));
272
278
273
- msg .code = MSG_HANDSHAKE ;
274
- msg .node = MtmNodeId ;
275
- msg .dxid = HANDSHAKE_MAGIC ;
276
- msg .sxid = ShmemVariableCache -> nextXid ;
277
- msg .csn = MtmGetCurrentTime ();
278
- msg .disabledNodeMask = ds -> disabledNodeMask ;
279
- if (!MtmWriteSocket (sd ,& msg ,sizeof msg )) {
279
+ req .hdr .code = MSG_HANDSHAKE ;
280
+ req .hdr .node = MtmNodeId ;
281
+ req .hdr .dxid = HANDSHAKE_MAGIC ;
282
+ req .hdr .sxid = ShmemVariableCache -> nextXid ;
283
+ req .hdr .csn = MtmGetCurrentTime ();
284
+ req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
285
+ strcpy (req .connStr ,Mtm -> nodes [MtmNodeId - 1 ].connStr );
286
+ if (!MtmWriteSocket (sd ,& req ,sizeof req )) {
280
287
elog (WARNING ,"Arbiter failed to send handshake message to %s:%d: %d" ,host ,port ,errno );
281
288
close (sd );
282
289
gotoRetry ;
283
290
}
284
- if (MtmReadSocket (sd ,& msg ,sizeof msg )!= sizeof (msg )) {
291
+ if (MtmReadSocket (sd ,& resp ,sizeof resp )!= sizeof (resp )) {
285
292
elog (WARNING ,"Arbiter failed to receive response for handshake message from %s:%d: errno=%d" ,host ,port ,errno );
286
293
close (sd );
287
294
gotoRetry ;
288
295
}
289
- if (msg .code != MSG_STATUS || msg .dxid != HANDSHAKE_MAGIC ) {
290
- elog (WARNING ,"Arbiter get unexpected response %d for handshake message from %s:%d" ,msg .code ,host ,port );
296
+ if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
297
+ elog (WARNING ,"Arbiter get unexpected response %d for handshake message from %s:%d" ,resp .code ,host ,port );
291
298
close (sd );
292
299
gotoRetry ;
293
300
}
294
301
295
302
/* Some node considered that I am dead, so switch to recovery mode */
296
- if (BIT_CHECK (msg .disabledNodeMask ,MtmNodeId - 1 )) {
297
- elog (WARNING ,"Node %d think that I am dead" ,msg .node );
303
+ if (BIT_CHECK (resp .disabledNodeMask ,MtmNodeId - 1 )) {
304
+ elog (WARNING ,"Node %d think that I am dead" ,resp .node );
298
305
MtmSwitchClusterMode (MTM_RECOVERY );
299
306
}
300
307
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
301
- ds -> disabledNodeMask |=msg .disabledNodeMask ;
308
+ Mtm -> disabledNodeMask |=resp .disabledNodeMask ;
302
309
return sd ;
303
310
}
304
311
}
@@ -309,39 +316,23 @@ static void MtmOpenConnections()
309
316
{
310
317
int nNodes = MtmNodes ;
311
318
int i ;
312
- char * connStr = pstrdup (MtmConnStrs );
313
319
314
320
sockets = (int * )palloc (sizeof (int )* nNodes );
315
- hosts = (char * * )palloc (sizeof (char * )* nNodes );
316
321
317
322
for (i = 0 ;i < nNodes ;i ++ ) {
318
- char * host = strstr (connStr ,"host=" );
319
- char * end ;
320
- if (host == NULL ) {
321
- elog (ERROR ,"Invalid connection string: '%s'" ,MtmConnStrs );
322
- }
323
- host += 5 ;
324
- for (end = host ;* end != ' ' && * end != ',' && * end != '\0' ;end ++ );
325
- if (* end != '\0' ) {
326
- * end = '\0' ;
327
- connStr = end + 1 ;
328
- }else {
329
- connStr = end ;
330
- }
331
- hosts [i ]= host ;
332
323
if (i + 1 != MtmNodeId ) {
333
- sockets [i ]= MtmConnectSocket (host ,MtmArbiterPort + i + 1 ,MtmConnectAttempts );
324
+ sockets [i ]= MtmConnectSocket (Mtm -> nodes [ i ]. hostName ,MtmArbiterPort + i + 1 ,MtmConnectAttempts );
334
325
if (sockets [i ]< 0 ) {
335
326
MtmOnNodeDisconnect (i + 1 );
336
327
}
337
328
}else {
338
329
sockets [i ]= -1 ;
339
330
}
340
331
}
341
- if (ds -> nNodes < MtmNodes /2 + 1 ) {/* no quorum */
342
- elog (WARNING ,"Node is out of quorum: only %d nodes from %d are accssible" ,ds -> nNodes ,MtmNodes );
343
- ds -> status = MTM_OFFLINE ;
344
- }else if (ds -> status == MTM_INITIALIZATION ) {
332
+ if (Mtm -> nNodes < MtmNodes /2 + 1 ) {/* no quorum */
333
+ elog (WARNING ,"Node is out of quorum: only %d nodes from %d are accssible" ,Mtm -> nNodes ,MtmNodes );
334
+ Mtm -> status = MTM_OFFLINE ;
335
+ }else if (Mtm -> status == MTM_INITIALIZATION ) {
345
336
MtmSwitchClusterMode (MTM_CONNECTED );
346
337
}
347
338
}
@@ -354,7 +345,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
354
345
if (sockets [node ] >=0 ) {
355
346
close (sockets [node ]);
356
347
}
357
- sockets [node ]= MtmConnectSocket (hosts [node ],MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
348
+ sockets [node ]= MtmConnectSocket (Mtm -> nodes [node ]. hostName ,MtmArbiterPort + node + 1 ,MtmReconnectAttempts );
358
349
if (sockets [node ]< 0 ) {
359
350
MtmOnNodeDisconnect (node + 1 );
360
351
return false;
@@ -379,29 +370,31 @@ static void MtmAcceptOneConnection()
379
370
if (fd < 0 ) {
380
371
elog (WARNING ,"Arbiter failed to accept socket: %d" ,errno );
381
372
}else {
382
- MtmArbiterMessage msg ;
383
- int rc = MtmReadSocket (fd ,& msg ,sizeof msg );
384
- if (rc < sizeof (msg )) {
373
+ MtmHandshakeMessage req ;
374
+ MtmArbiterMessage resp ;
375
+ int rc = MtmReadSocket (fd ,& req ,sizeof req );
376
+ if (rc < sizeof (req )) {
385
377
elog (WARNING ,"Arbiter failed to handshake socket: %d, errno=%d" ,rc ,errno );
386
- }else if (msg . code != MSG_HANDSHAKE && msg .dxid != HANDSHAKE_MAGIC ) {
387
- elog (WARNING ,"Arbiter get unexpected handshake message %d" ,msg .code );
378
+ }else if (req . hdr . code != MSG_HANDSHAKE && req . hdr .dxid != HANDSHAKE_MAGIC ) {
379
+ elog (WARNING ,"Arbiter get unexpected handshake message %d" ,req . hdr .code );
388
380
close (fd );
389
381
}else {
390
- Assert (msg .node > 0 && msg .node <=MtmNodes && msg .node != MtmNodeId );
391
- msg .code = MSG_STATUS ;
392
- msg .disabledNodeMask = ds -> disabledNodeMask ;
393
- msg .dxid = HANDSHAKE_MAGIC ;
394
- msg .sxid = ShmemVariableCache -> nextXid ;
395
- msg .csn = MtmGetCurrentTime ();
396
- if (!MtmWriteSocket (fd ,& msg ,sizeof msg )) {
397
- elog (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,msg .node );
382
+ Assert (req .hdr .node > 0 && req .hdr .node <=MtmNodes && req .hdr .node != MtmNodeId );
383
+ resp .code = MSG_STATUS ;
384
+ resp .disabledNodeMask = Mtm -> disabledNodeMask ;
385
+ resp .dxid = HANDSHAKE_MAGIC ;
386
+ resp .sxid = ShmemVariableCache -> nextXid ;
387
+ resp .csn = MtmGetCurrentTime ();
388
+ MtmUpdateNodeConnStr (req .hdr .node ,req .connStr );
389
+ if (!MtmWriteSocket (fd ,& resp ,sizeof resp )) {
390
+ elog (WARNING ,"Arbiter failed to write response for handshake message to node %d" ,resp .node );
398
391
close (fd );
399
392
}else {
400
- elog (NOTICE ,"Arbiter established connection with node %d" ,msg .node );
401
- BIT_CLEAR (ds -> connectivityMask ,msg .node - 1 );
402
- MtmRegisterSocket (fd ,msg .node - 1 );
403
- sockets [msg .node - 1 ]= fd ;
404
- MtmOnNodeConnect (msg .node );
393
+ elog (NOTICE ,"Arbiter established connection with node %d" ,req . hdr .node );
394
+ BIT_CLEAR (Mtm -> connectivityMask ,req . hdr .node - 1 );
395
+ MtmRegisterSocket (fd ,req . hdr .node - 1 );
396
+ sockets [req . hdr .node - 1 ]= fd ;
397
+ MtmOnNodeConnect (req . hdr .node );
405
398
}
406
399
}
407
400
}
@@ -415,7 +408,9 @@ static void MtmAcceptIncomingConnections()
415
408
int i ;
416
409
417
410
sockets = (int * )palloc (sizeof (int )* MtmNodes );
418
-
411
+ for (i = 0 ;i < MtmNodes ;i ++ ) {
412
+ sockets [i ]= -1 ;
413
+ }
419
414
sock_inet .sin_family = AF_INET ;
420
415
sock_inet .sin_addr .s_addr = htonl (INADDR_ANY );
421
416
sock_inet .sin_port = htons (MtmArbiterPort + MtmNodeId );
@@ -461,7 +456,8 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
461
456
buf -> data [buf -> used ].sxid = ts -> xid ;
462
457
buf -> data [buf -> used ].csn = ts -> csn ;
463
458
buf -> data [buf -> used ].node = MtmNodeId ;
464
- buf -> data [buf -> used ].disabledNodeMask = ds -> disabledNodeMask ;
459
+ buf -> data [buf -> used ].disabledNodeMask = Mtm -> disabledNodeMask ;
460
+ buf -> data [buf -> used ].oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
465
461
buf -> used += 1 ;
466
462
}
467
463
@@ -477,7 +473,7 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
477
473
n += 1 ;
478
474
}
479
475
}
480
- Assert (n == ds -> nNodes );
476
+ Assert (n == Mtm -> nNodes );
481
477
}
482
478
483
479
@@ -487,8 +483,6 @@ static void MtmTransSender(Datum arg)
487
483
int i ;
488
484
MtmBuffer * txBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
489
485
490
- ds = MtmGetState ();
491
-
492
486
MtmOpenConnections ();
493
487
494
488
for (i = 0 ;i < nNodes ;i ++ ) {
@@ -497,7 +491,7 @@ static void MtmTransSender(Datum arg)
497
491
498
492
while (true) {
499
493
MtmTransState * ts ;
500
- PGSemaphoreLock (& ds -> votingSemaphore );
494
+ PGSemaphoreLock (& Mtm -> votingSemaphore );
501
495
CHECK_FOR_INTERRUPTS ();
502
496
503
497
/*
@@ -506,14 +500,14 @@ static void MtmTransSender(Datum arg)
506
500
*/
507
501
MtmLock (LW_SHARED );
508
502
509
- for (ts = ds -> votingTransactions ;ts != NULL ;ts = ts -> nextVoting ) {
503
+ for (ts = Mtm -> votingTransactions ;ts != NULL ;ts = ts -> nextVoting ) {
510
504
if (MtmIsCoordinator (ts )) {
511
505
MtmBroadcastMessage (txBuffer ,ts );
512
506
}else {
513
507
MtmAppendBuffer (txBuffer ,ts -> gtid .xid ,ts -> gtid .node - 1 ,ts );
514
508
}
515
509
}
516
- ds -> votingTransactions = NULL ;
510
+ Mtm -> votingTransactions = NULL ;
517
511
518
512
MtmUnlock ();
519
513
@@ -573,8 +567,6 @@ static void MtmTransReceiver(Datum arg)
573
567
max_fd = 0 ;
574
568
#endif
575
569
576
- ds = MtmGetState ();
577
-
578
570
MtmAcceptIncomingConnections ();
579
571
580
572
for (i = 0 ;i < nNodes ;i ++ ) {
@@ -613,7 +605,7 @@ static void MtmTransReceiver(Datum arg)
613
605
elog (ERROR ,"Arbiter failed to select sockets: %d" ,errno );
614
606
}
615
607
for (i = 0 ;i < nNodes ;i ++ ) {
616
- if (FD_ISSET (sockets [i ],& events ))
608
+ if (sockets [ i ] >= 0 && FD_ISSET (sockets [i ],& events ))
617
609
#endif
618
610
{
619
611
if (i + 1 == MtmNodeId ) {
@@ -637,13 +629,24 @@ static void MtmTransReceiver(Datum arg)
637
629
MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State ,& msg -> dxid ,HASH_FIND ,NULL );
638
630
Assert (ts != NULL );
639
631
Assert (msg -> node > 0 && msg -> node <=nNodes && msg -> node != MtmNodeId );
632
+
633
+ Mtm -> nodes [msg -> node - 1 ].oldestSnapshot = msg -> oldestSnapshot ;
634
+
640
635
if (MtmIsCoordinator (ts )) {
641
636
switch (msg -> code ) {
642
637
case MSG_READY :
643
- Assert (ts -> nVotes < ds -> nNodes );
644
- ds -> nodeTransDelay [msg -> node - 1 ]+= MtmGetCurrentTime ()- ts -> csn ;
638
+ Assert (ts -> nVotes < Mtm -> nNodes );
639
+ Mtm -> nodes [msg -> node - 1 ]. transDelay += MtmGetCurrentTime ()- ts -> csn ;
645
640
ts -> xids [msg -> node - 1 ]= msg -> sxid ;
646
- if (++ ts -> nVotes == ds -> nNodes ) {
641
+
642
+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask )!= 0 ) {
643
+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
644
+ commit on smaller subset of nodes */
645
+ ts -> status = TRANSACTION_STATUS_ABORTED ;
646
+ MtmAdjustSubtransactions (ts );
647
+ }
648
+
649
+ if (++ ts -> nVotes == Mtm -> nNodes ) {
647
650
/* All nodes are finished their transactions */
648
651
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
649
652
ts -> nVotes = 1 ;/* I voted myself */
@@ -655,24 +658,24 @@ static void MtmTransReceiver(Datum arg)
655
658
}
656
659
break ;
657
660
case MSG_ABORTED :
658
- Assert (ts -> nVotes < ds -> nNodes );
661
+ Assert (ts -> nVotes < Mtm -> nNodes );
659
662
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
660
663
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
661
664
ts -> status = TRANSACTION_STATUS_ABORTED ;
662
665
MtmAdjustSubtransactions (ts );
663
666
}
664
- if (++ ts -> nVotes == ds -> nNodes ) {
667
+ if (++ ts -> nVotes == Mtm -> nNodes ) {
665
668
MtmWakeUpBackend (ts );
666
669
}
667
670
break ;
668
671
case MSG_PREPARED :
669
672
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
670
- Assert (ts -> nVotes < ds -> nNodes );
673
+ Assert (ts -> nVotes < Mtm -> nNodes );
671
674
if (msg -> csn > ts -> csn ) {
672
675
ts -> csn = msg -> csn ;
673
676
MtmSyncClock (ts -> csn );
674
677
}
675
- if (++ ts -> nVotes == ds -> nNodes ) {
678
+ if (++ ts -> nVotes == Mtm -> nNodes ) {
676
679
ts -> csn = MtmAssignCSN ();
677
680
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
678
681
MtmWakeUpBackend (ts );
@@ -703,7 +706,7 @@ static void MtmTransReceiver(Datum arg)
703
706
}
704
707
}
705
708
}
706
- if (n == 0 && ds -> disabledNodeMask != 0 ) {
709
+ if (n == 0 && Mtm -> disabledNodeMask != 0 ) {
707
710
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
708
711
MtmRefreshClusterStatus (false);
709
712
}