5757
5858#include "multimaster.h"
5959#include "ddd.h"
60- #include "paxos .h"
60+ #include "raftable .h"
6161
6262typedef struct {
6363TransactionId xid ;/* local transaction ID */
@@ -178,6 +178,7 @@ static int MtmWorkers;
178178static int MtmVacuumDelay ;
179179static int MtmMinRecoveryLag ;
180180static int MtmMaxRecoveryLag ;
181+ static bool MtmUseRaftable ;
181182
182183static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
183184static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1102,7 +1103,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
11021103int i ,j ,n = MtmNodes ;
11031104for (i = 0 ;i < n ;i ++ ) {
11041105if (i + 1 != MtmNodeId ) {
1105- void * data = PaxosGet (psprintf ("node-mask-%d" ,i + 1 ),NULL ,NULL ,nowait );
1106+ void * data = RaftableGet (psprintf ("node-mask-%d" ,i + 1 ),NULL ,NULL ,nowait );
11061107if (data == NULL ) {
11071108return false;
11081109}
@@ -1132,7 +1133,7 @@ bool MtmRefreshClusterStatus(bool nowait)
11321133int clique_size ;
11331134int i ;
11341135
1135- if (!MtmBuildConnectivityMatrix (matrix ,nowait )) {
1136+ if (!MtmUseRaftable || ! MtmBuildConnectivityMatrix (matrix ,nowait )) {
11361137/* RAFT is not available */
11371138return false;
11381139}
@@ -1192,7 +1193,7 @@ void MtmCheckQuorum(void)
11921193void MtmOnNodeDisconnect (int nodeId )
11931194{
11941195BIT_SET (Mtm -> connectivityMask ,nodeId - 1 );
1195- PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
1196+ RaftableSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
11961197
11971198/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
11981199MtmSleep (MtmKeepaliveTimeout );
@@ -1211,52 +1212,9 @@ void MtmOnNodeDisconnect(int nodeId)
12111212void MtmOnNodeConnect (int nodeId )
12121213{
12131214BIT_CLEAR (Mtm -> connectivityMask ,nodeId - 1 );
1214- PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
1215+ RaftableSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
12151216}
12161217
1217- /*
1218- * Paxos function stubs (until them are miplemented)
1219- */
1220- void * PaxosGet (char const * key ,int * size ,PaxosTimestamp * ts ,bool nowait )
1221- {
1222- unsigned enclen ,declen ,len ;
1223- char * enc ,* dec ;
1224- Assert (ts == NULL );// not implemented
1225-
1226- enc = raftable_get (key );
1227- if (enc == NULL )
1228- {
1229- * size = 0 ;
1230- return NULL ;
1231- }
1232-
1233- enclen = strlen (enc );
1234- declen = hex_dec_len (enc ,enclen );
1235- dec = palloc (declen );
1236- len = hex_decode (enc ,enclen ,dec );
1237- pfree (enc );
1238- Assert (len == declen );
1239-
1240- if (size != NULL ) {
1241- * size = declen ;
1242- }
1243- return dec ;
1244- }
1245-
1246- void PaxosSet (char const * key ,void const * value ,int size ,bool nowait )
1247- {
1248- unsigned enclen ,declen ,len ;
1249- char * enc ,* dec ;
1250-
1251- enclen = hex_enc_len (value ,size );
1252- enc = palloc (enclen )+ 1 ;
1253- len = hex_encode (value ,size ,enc );
1254- Assert (len == enclen );
1255- enc [len ]= '\0' ;
1256-
1257- raftable_set (key ,enc ,nowait ?1 :INT_MAX );
1258- pfree (enc );
1259- }
12601218
12611219
12621220/*
@@ -1483,6 +1441,19 @@ _PG_init(void)
14831441NULL
14841442);
14851443
1444+ DefineCustomBoolVariable (
1445+ "multimaster.use_raftable" ,
1446+ "Use raftable plugin for internode communication" ,
1447+ NULL ,
1448+ & MtmUseRaftable ,
1449+ false,
1450+ PGC_BACKEND ,
1451+ 0 ,
1452+ NULL ,
1453+ NULL ,
1454+ NULL
1455+ );
1456+
14861457DefineCustomIntVariable (
14871458"multimaster.workers" ,
14881459"Number of multimaster executor workers per node" ,
@@ -1773,6 +1744,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
17731744break ;
17741745}
17751746}
1747+ if (isRecoverySession ) {
1748+ MTM_INFO ("%d: PGLOGICAL startup hook\n" ,MyProcPid );
1749+ sleep (30 );
1750+ }
17761751MtmLock (LW_EXCLUSIVE );
17771752if (isRecoverySession ) {
17781753elog (WARNING ,"Node %d start recovery of node %d" ,MtmNodeId ,MtmReplicationNodeId );
@@ -1805,7 +1780,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18051780bool res = Mtm -> status != MTM_RECOVERY
18061781&& (args -> origin_id == InvalidRepOriginId
18071782|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1808- MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" ,MyProcPid ,res );
1783+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" ,MyProcPid ,res );
18091784return res ;
18101785}
18111786
@@ -2373,16 +2348,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23732348
23742349ByteBufferAlloc (& buf );
23752350EnumerateLocks (MtmSerializeLock ,& buf );
2376- PaxosSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .used , true);
2351+ RaftableSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .used , true);
23772352MtmGraphInit (& graph );
23782353MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data ,buf .used /sizeof (GlobalTransactionId ));
23792354ByteBufferFree (& buf );
23802355for (i = 0 ;i < MtmNodes ;i ++ ) {
23812356if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask ,i )) {
23822357int size ;
2383- void * data = PaxosGet (psprintf ("lock-graph-%d" ,i + 1 ),& size ,NULL , true);
2358+ void * data = RaftableGet (psprintf ("lock-graph-%d" ,i + 1 ),& size ,NULL , true);
23842359if (data == NULL ) {
2385- return true;/*Just temporary hack until no Paxos */
2360+ return true;/*If using Raftable is disabled */
23862361}else {
23872362MtmGraphAdd (& graph , (GlobalTransactionId * )data ,size /sizeof (GlobalTransactionId ));
23882363}