Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6849188

Browse files
committed
decouple raftable
1 parenta7ec13e commit6849188

File tree

4 files changed

+44
-192
lines changed

4 files changed

+44
-192
lines changed

‎Cluster.pm

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ sub new
4040
foreachmy$i (1..$nodenum)
4141
{
4242
my$host ="127.0.0.1";
43-
my ($pgport,$raftport) = allocate_ports($host, 2);
43+
my ($pgport,$arbiter_port) = allocate_ports($host, 2);
4444
my$node = new PostgresNode("node$i",$host,$pgport);
4545
$node->{id} =$i;
46-
$node->{raftport} =$raftport;
46+
$node->{arbiter_port} =$arbiter_port;
4747
push(@$nodes,$node);
4848
}
4949

@@ -71,16 +71,16 @@ sub configure
7171
{
7272
my ($self) =@_;
7373
my$nodes =$self->{nodes};
74+
my$nnodes =scalar @{$nodes };
7475

75-
my$connstr =join(',',map {"${\$_->connstr('postgres') }" }@$nodes);
76-
my$raftpeers =join(',',map {join(':',$_->{id},$_->host,$_->{raftport}) }@$nodes);
76+
my$connstr =join(',',map {"${\$_->connstr('postgres') } arbiter_port=${\$_->{arbiter_port} }" }@$nodes);
7777

7878
foreachmy$node (@$nodes)
7979
{
8080
my$id =$node->{id};
8181
my$host =$node->host;
8282
my$pgport =$node->port;
83-
my$raftport =$node->{raftport};
83+
my$arbiter_port =$node->{arbiter_port};
8484

8585
$node->append_conf("postgresql.conf",qq(
8686
log_statement = none
@@ -94,20 +94,22 @@ sub configure
9494
fsync = off
9595
max_wal_senders = 10
9696
wal_sender_timeout = 0
97-
default_transaction_isolation = 'repeatable read'
97+
default_transaction_isolation = 'repeatable read'
9898
max_replication_slots = 10
99-
shared_preload_libraries = 'raftable,multimaster'
99+
shared_preload_libraries = 'multimaster'
100+
101+
multimaster.arbiter_port =$arbiter_port
100102
multimaster.workers = 10
101103
multimaster.queue_size = 10485760 # 10mb
102104
multimaster.node_id =$id
103105
multimaster.conn_strings = '$connstr'
104-
multimaster.use_raftable = false
105106
multimaster.heartbeat_recv_timeout = 1000
106107
multimaster.heartbeat_send_timeout = 250
107-
multimaster.max_nodes =3
108+
multimaster.max_nodes =$nnodes
108109
multimaster.ignore_tables_without_pk = true
109-
multimaster.twopc_min_timeout = 2000
110-
log_line_prefix = '%t: '
110+
multimaster.twopc_min_timeout = 50000
111+
multimaster.min_2pc_timeout = 50000
112+
log_line_prefix = '%t: '
111113
));
112114

113115
$node->append_conf("pg_hba.conf",qq(

‎Makefile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.oraftable.oarbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o
2+
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o
33

44
ifndefRAFTABLE_PATH
55
RAFTABLE_PATH = ../raftable
66
endif
77

88
overrideCPPFLAGS += -I$(RAFTABLE_PATH) -I$(RAFTABLE_PATH)/raft/include
99

10-
EXTRA_INSTALL = contrib/raftable contrib/mmts
11-
1210
EXTENSION = multimaster
1311
DATA = multimaster--1.0.sql
1412

‎multimaster.c

Lines changed: 30 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@
7272

7373
#include"multimaster.h"
7474
#include"ddd.h"
75-
#include"raftable_wrapper.h"
76-
#include"raftable.h"
77-
#include"worker.h"
7875

7976
typedefstruct {
8077
TransactionIdxid;/* local transaction ID */
@@ -215,7 +212,6 @@ int MtmNodes;
215212
intMtmNodeId;
216213
intMtmReplicationNodeId;
217214
intMtmArbiterPort;
218-
intMtmRaftablePort;
219215
intMtmConnectTimeout;
220216
intMtmReconnectTimeout;
221217
intMtmNodeDisableDelay;
@@ -225,7 +221,6 @@ int MtmHeartbeatSendTimeout;
225221
intMtmHeartbeatRecvTimeout;
226222
intMtmMin2PCTimeout;
227223
intMtmMax2PCRatio;
228-
boolMtmUseRaftable;
229224
boolMtmUseDtm;
230225
boolMtmPreserveCommitOrder;
231226
boolMtmVolksWagenMode;
@@ -1745,7 +1740,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
17451740

17461741
for (i=0;i<n;i++) {
17471742
if (i+1!=MtmNodeId) {
1748-
void*data=RaftableGet(psprintf("node-mask-%d",i+1),NULL,NULL,nowait);
1743+
void*data=&Mtm->nodes[i].connectivityMask;
17491744
if (data==NULL) {
17501745
return false;
17511746
}
@@ -1901,17 +1896,6 @@ void MtmOnNodeDisconnect(int nodeId)
19011896
MTM_LOG1("Disconnect node %d connectivity mask %llx",nodeId, (long long)Mtm->connectivityMask);
19021897
MtmUnlock();
19031898

1904-
if (!RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false))
1905-
{
1906-
elog(WARNING,"Disable node which is in minority according to RAFT");
1907-
MtmLock(LW_EXCLUSIVE);
1908-
if (Mtm->status==MTM_ONLINE) {
1909-
MtmSwitchClusterMode(MTM_IN_MINORITY);
1910-
}
1911-
MtmUnlock();
1912-
return;
1913-
}
1914-
19151899
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
19161900
MtmRefreshClusterStatus(false);
19171901
}
@@ -1922,9 +1906,6 @@ void MtmOnNodeConnect(int nodeId)
19221906
BIT_CLEAR(Mtm->connectivityMask,nodeId-1);
19231907
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
19241908
MtmUnlock();
1925-
1926-
MTM_LOG1("Reconnect node %d, connectivityMask=%llx",nodeId, (long long)Mtm->connectivityMask);
1927-
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
19281909
}
19291910

19301911

@@ -2034,21 +2015,6 @@ static void MtmLoadLocalTables(void)
20342015
heap_close(rel,RowExclusiveLock);
20352016
}
20362017
}
2037-
2038-
staticvoidMtmRaftableInitialize()
2039-
{
2040-
inti;
2041-
2042-
for (i=0;i<MtmNodes;i++)
2043-
{
2044-
intport=MtmConnections[i].raftablePort;
2045-
if (port==0) {
2046-
port=MtmRaftablePort+i;
2047-
}
2048-
raftable_peer(i,MtmConnections[i].hostName,port);
2049-
}
2050-
raftable_start(MtmNodeId-1);
2051-
}
20522018

20532019
staticvoidMtmCheckControlFile(void)
20542020
{
@@ -2197,19 +2163,10 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
21972163
memcpy(conn->hostName,host,hostLen);
21982164
conn->hostName[hostLen]='\0';
21992165

2200-
port=strstr(connStr,"raftport=");
2166+
port=strstr(connStr,"arbiter_port=");
22012167
if (port!=NULL) {
2202-
if (sscanf(port+9,"%d",&conn->raftablePort)!=1) {
2203-
elog(ERROR,"Invalid raftable port: %s",port+9);
2204-
}
2205-
}else {
2206-
conn->raftablePort=0;
2207-
}
2208-
2209-
port=strstr(connStr,"arbiterport=");
2210-
if (port!=NULL) {
2211-
if (sscanf(port+12,"%d",&conn->arbiterPort)!=1) {
2212-
elog(ERROR,"Invalid arbiter port: %s",port+12);
2168+
if (sscanf(port+13,"%d",&conn->arbiterPort)!=1) {
2169+
elog(ERROR,"Invalid arbiter port: %s",port+13);
22132170
}
22142171
}else {
22152172
conn->arbiterPort=MULTIMASTER_DEFAULT_ARBITER_PORT;
@@ -2538,19 +2495,6 @@ _PG_init(void)
25382495
NULL
25392496
);
25402497

2541-
DefineCustomBoolVariable(
2542-
"multimaster.use_raftable",
2543-
"Use raftable plugin for internode communication",
2544-
NULL,
2545-
&MtmUseRaftable,
2546-
true,
2547-
PGC_BACKEND,
2548-
0,
2549-
NULL,
2550-
NULL,
2551-
NULL
2552-
);
2553-
25542498
DefineCustomBoolVariable(
25552499
"multimaster.ignore_tables_without_pk",
25562500
"Do not replicate tables withpout primary key",
@@ -2708,21 +2652,6 @@ _PG_init(void)
27082652
NULL
27092653
);
27102654

2711-
DefineCustomIntVariable(
2712-
"multimaster.raftable_port",
2713-
"Base value for assigning raftable ports",
2714-
NULL,
2715-
&MtmRaftablePort,
2716-
6543,
2717-
0,
2718-
INT_MAX,
2719-
PGC_BACKEND,
2720-
0,
2721-
NULL,
2722-
NULL,
2723-
NULL
2724-
);
2725-
27262655
DefineCustomStringVariable(
27272656
"multimaster.conn_strings",
27282657
"Multimaster node connection strings separated by commas, i.e. 'replication=database dbname=postgres host=localhost port=5001,replication=database dbname=postgres host=localhost port=5002'",
@@ -2813,9 +2742,6 @@ _PG_init(void)
28132742

28142743
BgwPoolStart(MtmWorkers,MtmPoolConstructor);
28152744

2816-
if (MtmUseRaftable) {
2817-
MtmRaftableInitialize();
2818-
}
28192745
MtmArbiterInitialize();
28202746

28212747
/*
@@ -3516,8 +3442,7 @@ PGconn *PQconnectdb_safe(const char *conninfo)
35163442
{
35173443
PGconn*conn;
35183444
char*safe_connstr=pstrdup(conninfo);
3519-
erase_option_from_connstr("raftport",safe_connstr);
3520-
erase_option_from_connstr("arbiterport",safe_connstr);
3445+
erase_option_from_connstr("arbiter_port",safe_connstr);
35213446

35223447
conn=PQconnectdb(safe_connstr);
35233448

@@ -3626,11 +3551,17 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
36263551
inti;
36273552
for (i=0;i<Mtm->nAllNodes;i++)
36283553
{
3629-
size_tsize;
3630-
char*data=RaftableGet(psprintf("lock-graph-%d",i+1),&size,NULL, false);
3631-
if (data) {
3632-
GlobalTransactionId*gtid= (GlobalTransactionId*)data;
3633-
GlobalTransactionId*last= (GlobalTransactionId*)(data+size);
3554+
size_tlockGraphSize;
3555+
char*lockGraphData;
3556+
MtmLockNode(i+MtmMaxNodes,LW_SHARED);
3557+
lockGraphSize=Mtm->nodes[i].lockGraphUsed;
3558+
lockGraphData=palloc(lockGraphSize);
3559+
memcpy(lockGraphData,Mtm->nodes[i].lockGraphData,lockGraphSize);
3560+
MtmUnlockNode(i+MtmMaxNodes);
3561+
3562+
if (lockGraphData) {
3563+
GlobalTransactionId*gtid= (GlobalTransactionId*)lockGraphData;
3564+
GlobalTransactionId*last= (GlobalTransactionId*) (lockGraphData+lockGraphSize);
36343565
appendStringInfo(s,"node-%d lock graph: ",i+1);
36353566
while (gtid!=last) {
36363567
GlobalTransactionId*src=gtid++;
@@ -4543,19 +4474,28 @@ MtmDetectGlobalDeadLockFortXid(TransactionId xid)
45434474

45444475
ByteBufferAlloc(&buf);
45454476
EnumerateLocks(MtmSerializeLock,&buf);
4546-
RaftableSet(psprintf("lock-graph-%d",MtmNodeId),buf.data,buf.used, false);
4477+
4478+
Assert(replorigin_session_origin==InvalidRepOriginId);
4479+
XLogFlush(LogLogicalMessage("L",buf.data,buf.used, false));
4480+
45474481
MtmSleep(MSEC_TO_USEC(DeadlockTimeout));
45484482
MtmGraphInit(&graph);
45494483
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data,buf.used/sizeof(GlobalTransactionId));
45504484
ByteBufferFree(&buf);
45514485
for (i=0;i<Mtm->nAllNodes;i++) {
45524486
if (i+1!=MtmNodeId&& !BIT_CHECK(Mtm->disabledNodeMask,i)) {
4553-
size_tsize;
4554-
void*data=RaftableGet(psprintf("lock-graph-%d",i+1),&size,NULL, false);
4555-
if (data==NULL) {
4487+
size_tlockGraphSize;
4488+
void*lockGraphData;
4489+
MtmLockNode(i+MtmMaxNodes,LW_SHARED);
4490+
lockGraphSize=Mtm->nodes[i].lockGraphUsed;
4491+
lockGraphData=palloc(lockGraphSize);
4492+
memcpy(lockGraphData,Mtm->nodes[i].lockGraphData,lockGraphSize);
4493+
MtmUnlockNode(i+MtmMaxNodes);
4494+
4495+
if (lockGraphData==NULL) {
45564496
return true;/* If using Raftable is disabled */
45574497
}else {
4558-
MtmGraphAdd(&graph, (GlobalTransactionId*)data,size/sizeof(GlobalTransactionId));
4498+
MtmGraphAdd(&graph, (GlobalTransactionId*)lockGraphData,lockGraphSize/sizeof(GlobalTransactionId));
45594499
}
45604500
}
45614501
}

‎raftable.c

Lines changed: 0 additions & 88 deletions
This file was deleted.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp