58
58
#include "raftable.h"
59
59
#include "multimaster.h"
60
60
#include "ddd.h"
61
- #include "paxos .h"
61
+ #include "raftable .h"
62
62
63
63
typedef struct {
64
64
TransactionId xid ;/* local transaction ID */
@@ -172,6 +172,7 @@ int MtmConnectAttempts;
172
172
int MtmConnectTimeout ;
173
173
int MtmKeepaliveTimeout ;
174
174
int MtmReconnectAttempts ;
175
+ bool MtmUseRaftable ;
175
176
MtmConnectionInfo * MtmConnections ;
176
177
177
178
static char * MtmConnStrs ;
@@ -1104,7 +1105,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
1104
1105
int i ,j ,n = MtmNodes ;
1105
1106
for (i = 0 ;i < n ;i ++ ) {
1106
1107
if (i + 1 != MtmNodeId ) {
1107
- void * data = PaxosGet (psprintf ("node-mask-%d" ,i + 1 ),NULL ,NULL ,nowait );
1108
+ void * data = RaftableGet (psprintf ("node-mask-%d" ,i + 1 ),NULL ,NULL ,nowait );
1108
1109
if (data == NULL ) {
1109
1110
return false;
1110
1111
}
@@ -1134,7 +1135,7 @@ bool MtmRefreshClusterStatus(bool nowait)
1134
1135
int clique_size ;
1135
1136
int i ;
1136
1137
1137
- if (!MtmBuildConnectivityMatrix (matrix ,nowait )) {
1138
+ if (!MtmUseRaftable || ! MtmBuildConnectivityMatrix (matrix ,nowait )) {
1138
1139
/* RAFT is not available */
1139
1140
return false;
1140
1141
}
@@ -1194,7 +1195,7 @@ void MtmCheckQuorum(void)
1194
1195
void MtmOnNodeDisconnect (int nodeId )
1195
1196
{
1196
1197
BIT_SET (Mtm -> connectivityMask ,nodeId - 1 );
1197
- PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
1198
+ RaftableSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
1198
1199
1199
1200
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1200
1201
MtmSleep (MtmKeepaliveTimeout );
@@ -1213,52 +1214,9 @@ void MtmOnNodeDisconnect(int nodeId)
1213
1214
void MtmOnNodeConnect (int nodeId )
1214
1215
{
1215
1216
BIT_CLEAR (Mtm -> connectivityMask ,nodeId - 1 );
1216
- PaxosSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
1217
+ RaftableSet (psprintf ("node-mask-%d" ,MtmNodeId ),& Mtm -> connectivityMask ,sizeof Mtm -> connectivityMask , false);
1217
1218
}
1218
1219
1219
- /*
1220
- * Paxos function stubs (until them are miplemented)
1221
- */
1222
- void * PaxosGet (char const * key ,int * size ,PaxosTimestamp * ts ,bool nowait )
1223
- {
1224
- unsigned enclen ,declen ,len ;
1225
- char * enc ,* dec ;
1226
- Assert (ts == NULL );// not implemented
1227
-
1228
- enc = raftable_get (key );
1229
- if (enc == NULL )
1230
- {
1231
- * size = 0 ;
1232
- return NULL ;
1233
- }
1234
-
1235
- enclen = strlen (enc );
1236
- declen = hex_dec_len (enc ,enclen );
1237
- dec = palloc (declen );
1238
- len = hex_decode (enc ,enclen ,dec );
1239
- pfree (enc );
1240
- Assert (len == declen );
1241
-
1242
- if (size != NULL ) {
1243
- * size = declen ;
1244
- }
1245
- return dec ;
1246
- }
1247
-
1248
- void PaxosSet (char const * key ,void const * value ,int size ,bool nowait )
1249
- {
1250
- unsigned enclen ,declen ,len ;
1251
- char * enc ,* dec ;
1252
-
1253
- enclen = hex_enc_len (value ,size );
1254
- enc = palloc (enclen )+ 1 ;
1255
- len = hex_encode (value ,size ,enc );
1256
- Assert (len == enclen );
1257
- enc [len ]= '\0' ;
1258
-
1259
- raftable_set (key ,enc ,nowait ?1 :INT_MAX );
1260
- pfree (enc );
1261
- }
1262
1220
1263
1221
1264
1222
/*
@@ -1485,6 +1443,19 @@ _PG_init(void)
1485
1443
NULL
1486
1444
);
1487
1445
1446
+ DefineCustomBoolVariable (
1447
+ "multimaster.use_raftable" ,
1448
+ "Use raftable plugin for internode communication" ,
1449
+ NULL ,
1450
+ & MtmUseRaftable ,
1451
+ false,
1452
+ PGC_BACKEND ,
1453
+ 0 ,
1454
+ NULL ,
1455
+ NULL ,
1456
+ NULL
1457
+ );
1458
+
1488
1459
DefineCustomIntVariable (
1489
1460
"multimaster.workers" ,
1490
1461
"Number of multimaster executor workers per node" ,
@@ -1775,6 +1746,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1775
1746
break ;
1776
1747
}
1777
1748
}
1749
+ if (isRecoverySession ) {
1750
+ MTM_INFO ("%d: PGLOGICAL startup hook\n" ,MyProcPid );
1751
+ sleep (30 );
1752
+ }
1778
1753
MtmLock (LW_EXCLUSIVE );
1779
1754
if (isRecoverySession ) {
1780
1755
elog (WARNING ,"Node %d start recovery of node %d" ,MtmNodeId ,MtmReplicationNodeId );
@@ -1807,7 +1782,7 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1807
1782
bool res = Mtm -> status != MTM_RECOVERY
1808
1783
&& (args -> origin_id == InvalidRepOriginId
1809
1784
|| MtmIsRecoveredNode (MtmReplicationNodeId ));
1810
- MTM_INFO ("%d: MtmReplicationTxnFilterHook->%d\n" ,MyProcPid ,res );
1785
+ MTM_TRACE ("%d: MtmReplicationTxnFilterHook->%d\n" ,MyProcPid ,res );
1811
1786
return res ;
1812
1787
}
1813
1788
@@ -2376,16 +2351,16 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
2376
2351
2377
2352
ByteBufferAlloc (& buf );
2378
2353
EnumerateLocks (MtmSerializeLock ,& buf );
2379
- PaxosSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .used , true);
2354
+ RaftableSet (psprintf ("lock-graph-%d" ,MtmNodeId ),buf .data ,buf .used , true);
2380
2355
MtmGraphInit (& graph );
2381
2356
MtmGraphAdd (& graph , (GlobalTransactionId * )buf .data ,buf .used /sizeof (GlobalTransactionId ));
2382
2357
ByteBufferFree (& buf );
2383
2358
for (i = 0 ;i < MtmNodes ;i ++ ) {
2384
2359
if (i + 1 != MtmNodeId && !BIT_CHECK (Mtm -> disabledNodeMask ,i )) {
2385
2360
int size ;
2386
- void * data = PaxosGet (psprintf ("lock-graph-%d" ,i + 1 ),& size ,NULL , true);
2361
+ void * data = RaftableGet (psprintf ("lock-graph-%d" ,i + 1 ),& size ,NULL , true);
2387
2362
if (data == NULL ) {
2388
- return true;/*Just temporary hack until no Paxos */
2363
+ return true;/*If using Raftable is disabled */
2389
2364
}else {
2390
2365
MtmGraphAdd (& graph , (GlobalTransactionId * )data ,size /sizeof (GlobalTransactionId ));
2391
2366
}