@@ -141,6 +141,7 @@ int MtmArbiterPort;
141141int MtmNodes ;
142142int MtmConnectAttempts ;
143143int MtmConnectTimeout ;
144+ int MtmKeepaliveTimeout ;
144145int MtmReconnectAttempts ;
145146
146147static int MtmQueueSize ;
@@ -986,6 +987,21 @@ _PG_init(void)
986987NULL
987988);
988989
990+ DefineCustomIntVariable (
991+ "multimaster.keepalive_timeout" ,
992+ "Multimaster keepalive interval for sockets" ,
993+ "Timeout in microseconds before polling state of nodes" ,
994+ & MtmKeepaliveTimeout ,
995+ 1000000 ,
996+ 1 ,
997+ INT_MAX ,
998+ PGC_BACKEND ,
999+ 0 ,
1000+ NULL ,
1001+ NULL ,
1002+ NULL
1003+ );
1004+
9891005DefineCustomIntVariable (
9901006"multimaster.connect_attempts" ,
9911007"Multimaster number of connect attemts" ,
@@ -1528,8 +1544,12 @@ MtmGetGtid(TransactionId xid, GlobalTransactionId* gtid)
15281544
15291545MtmLock (LW_SHARED );
15301546ts = (MtmTransState * )hash_search (xid2state ,& xid ,HASH_FIND ,NULL );
1531- Assert (ts != NULL );
1532- * gtid = ts -> gtid ;
1547+ if (ts != NULL ) {
1548+ * gtid = ts -> gtid ;
1549+ }else {
1550+ gtid -> node = MtmNodeId ;
1551+ gtid -> xid = xid ;
1552+ }
15331553MtmUnlock ();
15341554}
15351555
@@ -1601,15 +1621,19 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16011621
16021622ByteBufferAlloc (& buf );
16031623EnumerateLocks (MtmSerializeLock ,& buf );
1604- ByteBufferFree (& buf );
1605- PaxosSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .size );
1624+ PaxosSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .used );
16061625MtmGraphInit (& graph );
1607- MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data ,buf .size /sizeof (GlobalTransactionId ));
1626+ MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data ,buf .used /sizeof (GlobalTransactionId ));
1627+ ByteBufferFree (& buf );
16081628for (i = 0 ;i < MtmNodes ;i ++ ) {
16091629if (i + 1 != MtmNodeId && !BIT_CHECK (dtm -> disabledNodeMask ,i )) {
16101630int size ;
16111631void * data = PaxosGet (psprintf ("lock-graph-%d" ,i + 1 ),& size ,NULL );
1612- MtmGraphAdd (& graph , (GlobalTransactionId * )data ,size /sizeof (GlobalTransactionId ));
1632+ if (data == NULL ) {
1633+ hasDeadlock = true;/* Just temporary hack until no Paxos */
1634+ }else {
1635+ MtmGraphAdd (& graph , (GlobalTransactionId * )data ,size /sizeof (GlobalTransactionId ));
1636+ }
16131637}
16141638}
16151639MtmGetGtid (pgxact -> xid ,& gtid );
@@ -1636,42 +1660,56 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
16361660
16371661void MtmUpdateClusterStatus (void )
16381662{
1639- nodemask_t mask ,clique ;
1663+ nodemask_t mask ,clique , disconnectedMask ;
16401664nodemask_t matrix [MAX_NODES ];
16411665int i ;
16421666
16431667MtmBuildConnectivityMatrix (matrix );
16441668
16451669clique = MtmFindMaxClique (matrix ,MtmNodes );
1646-
1670+ disconnectedMask = ~ clique & ((( nodemask_t ) 1 << MtmNodes ) - 1 );
16471671MtmLock (LW_EXCLUSIVE );
1648- mask = clique & ~dtm -> disabledNodeMask ;
1672+ mask = disconnectedMask & ~dtm -> disabledNodeMask ;
16491673for (i = 0 ;mask != 0 ;i ++ ,mask >>=1 ) {
16501674if (mask & 1 ) {
16511675dtm -> nNodes -= 1 ;
16521676BIT_SET (dtm -> disabledNodeMask ,i );
16531677}
16541678}
1655- if (dtm -> disabledNodeMask != clique ) {
1656- dtm -> disabledNodeMask |=clique ;
1679+ if (dtm -> disabledNodeMask != disconnectedMask ) {
1680+ dtm -> disabledNodeMask |=disconnectedMask ;
16571681PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& dtm -> disabledNodeMask ,sizeof dtm -> disabledNodeMask );
16581682}
16591683MtmUnlock ();
16601684}
16611685
1662- void MtmOnLostConnection (int nodeId )
1686+ void MtmOnNodeDisconnect (int nodeId )
16631687{
16641688BIT_SET (dtm -> connectivityMask ,nodeId - 1 );
16651689PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& dtm -> connectivityMask ,sizeof dtm -> connectivityMask );
16661690
16671691/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1668- MtmSleep (MtmConnectTimeout );
1692+ MtmSleep (MtmKeepaliveTimeout );
16691693
16701694MtmUpdateClusterStatus ();
16711695}
16721696
1673- void MtmOnConnectNode (int nodeId )
1697+ void MtmOnNodeConnect (int nodeId )
16741698{
16751699BIT_CLEAR (dtm -> connectivityMask ,nodeId - 1 );
16761700PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& dtm -> connectivityMask ,sizeof dtm -> connectivityMask );
16771701}
1702+
1703+ /*
1704+ * Paxos function stubs (until them are miplemented)
1705+ */
1706+ void * PaxosGet (char const * key ,int * size ,PaxosTimestamp * ts )
1707+ {
1708+ if (size != NULL ) {
1709+ * size = 0 ;
1710+ }
1711+ return NULL ;
1712+ }
1713+
1714+ void PaxosSet (char const * key ,void const * value ,int size )
1715+ {}