@@ -141,6 +141,7 @@ int MtmArbiterPort;
141
141
int MtmNodes ;
142
142
int MtmConnectAttempts ;
143
143
int MtmConnectTimeout ;
144
+ int MtmKeepaliveTimeout ;
144
145
int MtmReconnectAttempts ;
145
146
146
147
static int MtmQueueSize ;
@@ -986,6 +987,21 @@ _PG_init(void)
986
987
NULL
987
988
);
988
989
990
+ DefineCustomIntVariable (
991
+ "multimaster.keepalive_timeout" ,
992
+ "Multimaster keepalive interval for sockets" ,
993
+ "Timeout in microseconds before polling state of nodes" ,
994
+ & MtmKeepaliveTimeout ,
995
+ 1000000 ,
996
+ 1 ,
997
+ INT_MAX ,
998
+ PGC_BACKEND ,
999
+ 0 ,
1000
+ NULL ,
1001
+ NULL ,
1002
+ NULL
1003
+ );
1004
+
989
1005
DefineCustomIntVariable (
990
1006
"multimaster.connect_attempts" ,
991
1007
"Multimaster number of connect attemts" ,
@@ -1528,8 +1544,12 @@ MtmGetGtid(TransactionId xid, GlobalTransactionId* gtid)
1528
1544
1529
1545
MtmLock (LW_SHARED );
1530
1546
ts = (MtmTransState * )hash_search (xid2state ,& xid ,HASH_FIND ,NULL );
1531
- Assert (ts != NULL );
1532
- * gtid = ts -> gtid ;
1547
+ if (ts != NULL ) {
1548
+ * gtid = ts -> gtid ;
1549
+ }else {
1550
+ gtid -> node = MtmNodeId ;
1551
+ gtid -> xid = xid ;
1552
+ }
1533
1553
MtmUnlock ();
1534
1554
}
1535
1555
@@ -1601,15 +1621,19 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
1601
1621
1602
1622
ByteBufferAlloc (& buf );
1603
1623
EnumerateLocks (MtmSerializeLock ,& buf );
1604
- ByteBufferFree (& buf );
1605
- PaxosSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .size );
1624
+ PaxosSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .used );
1606
1625
MtmGraphInit (& graph );
1607
- MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data ,buf .size /sizeof (GlobalTransactionId ));
1626
+ MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data ,buf .used /sizeof (GlobalTransactionId ));
1627
+ ByteBufferFree (& buf );
1608
1628
for (i = 0 ;i < MtmNodes ;i ++ ) {
1609
1629
if (i + 1 != MtmNodeId && !BIT_CHECK (dtm -> disabledNodeMask ,i )) {
1610
1630
int size ;
1611
1631
void * data = PaxosGet (psprintf ("lock-graph-%d" ,i + 1 ),& size ,NULL );
1612
- MtmGraphAdd (& graph , (GlobalTransactionId * )data ,size /sizeof (GlobalTransactionId ));
1632
+ if (data == NULL ) {
1633
+ hasDeadlock = true;/* Just temporary hack until no Paxos */
1634
+ }else {
1635
+ MtmGraphAdd (& graph , (GlobalTransactionId * )data ,size /sizeof (GlobalTransactionId ));
1636
+ }
1613
1637
}
1614
1638
}
1615
1639
MtmGetGtid (pgxact -> xid ,& gtid );
@@ -1636,42 +1660,56 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
1636
1660
1637
1661
void MtmUpdateClusterStatus (void )
1638
1662
{
1639
- nodemask_t mask ,clique ;
1663
+ nodemask_t mask ,clique , disconnectedMask ;
1640
1664
nodemask_t matrix [MAX_NODES ];
1641
1665
int i ;
1642
1666
1643
1667
MtmBuildConnectivityMatrix (matrix );
1644
1668
1645
1669
clique = MtmFindMaxClique (matrix ,MtmNodes );
1646
-
1670
+ disconnectedMask = ~ clique & ((( nodemask_t ) 1 << MtmNodes ) - 1 );
1647
1671
MtmLock (LW_EXCLUSIVE );
1648
- mask = clique & ~dtm -> disabledNodeMask ;
1672
+ mask = disconnectedMask & ~dtm -> disabledNodeMask ;
1649
1673
for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
1650
1674
if (mask & 1 ) {
1651
1675
dtm -> nNodes -= 1 ;
1652
1676
BIT_SET (dtm -> disabledNodeMask ,i );
1653
1677
}
1654
1678
}
1655
- if (dtm -> disabledNodeMask != clique ) {
1656
- dtm -> disabledNodeMask |=clique ;
1679
+ if (dtm -> disabledNodeMask != disconnectedMask ) {
1680
+ dtm -> disabledNodeMask |=disconnectedMask ;
1657
1681
PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& dtm -> disabledNodeMask ,sizeof dtm -> disabledNodeMask );
1658
1682
}
1659
1683
MtmUnlock ();
1660
1684
}
1661
1685
1662
- void MtmOnLostConnection (int nodeId )
1686
+ void MtmOnNodeDisconnect (int nodeId )
1663
1687
{
1664
1688
BIT_SET (dtm -> connectivityMask ,nodeId - 1 );
1665
1689
PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& dtm -> connectivityMask ,sizeof dtm -> connectivityMask );
1666
1690
1667
1691
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1668
- MtmSleep (MtmConnectTimeout );
1692
+ MtmSleep (MtmKeepaliveTimeout );
1669
1693
1670
1694
MtmUpdateClusterStatus ();
1671
1695
}
1672
1696
1673
- void MtmOnConnectNode (int nodeId )
1697
+ void MtmOnNodeConnect (int nodeId )
1674
1698
{
1675
1699
BIT_CLEAR (dtm -> connectivityMask ,nodeId - 1 );
1676
1700
PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& dtm -> connectivityMask ,sizeof dtm -> connectivityMask );
1677
1701
}
1702
+
1703
+ /*
1704
+ * Paxos function stubs (until them are miplemented)
1705
+ */
1706
+ void * PaxosGet (char const * key ,int * size ,PaxosTimestamp * ts )
1707
+ {
1708
+ if (size != NULL ) {
1709
+ * size = 0 ;
1710
+ }
1711
+ return NULL ;
1712
+ }
1713
+
1714
+ void PaxosSet (char const * key ,void const * value ,int size )
1715
+ {}