Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6fd8584

Browse files
knizhnikkelvich
authored andcommitted
Fix configuration problem
1 parent5fd6066 commit6fd8584

File tree

4 files changed

+58
-34
lines changed

4 files changed

+58
-34
lines changed

‎arbiter.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
282282
req.hdr.sxid=ShmemVariableCache->nextXid;
283283
req.hdr.csn=MtmGetCurrentTime();
284284
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
285-
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].connStr);
285+
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
286286
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
287287
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
288288
close(sd);
@@ -321,7 +321,7 @@ static void MtmOpenConnections()
321321

322322
for (i=0;i<nNodes;i++) {
323323
if (i+1!=MtmNodeId) {
324-
sockets[i]=MtmConnectSocket(Mtm->nodes[i].hostName,MtmArbiterPort+i+1,MtmConnectAttempts);
324+
sockets[i]=MtmConnectSocket(Mtm->nodes[i].con.hostName,MtmArbiterPort+i+1,MtmConnectAttempts);
325325
if (sockets[i]<0) {
326326
MtmOnNodeDisconnect(i+1);
327327
}
@@ -345,7 +345,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
345345
if (sockets[node] >=0) {
346346
close(sockets[node]);
347347
}
348-
sockets[node]=MtmConnectSocket(Mtm->nodes[node].hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
348+
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
349349
if (sockets[node]<0) {
350350
MtmOnNodeDisconnect(node+1);
351351
return false;
@@ -385,7 +385,7 @@ static void MtmAcceptOneConnection()
385385
resp.dxid=HANDSHAKE_MAGIC;
386386
resp.sxid=ShmemVariableCache->nextXid;
387387
resp.csn=MtmGetCurrentTime();
388-
MtmUpdateNodeConnStr(req.hdr.node,req.connStr);
388+
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con,req.connStr);
389389
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
390390
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",resp.node);
391391
close(fd);

‎multimaster.c

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ int MtmConnectAttempts;
166166
intMtmConnectTimeout;
167167
intMtmKeepaliveTimeout;
168168
intMtmReconnectAttempts;
169+
MtmConnectionInfo*MtmConnections;
169170

170171
staticchar*MtmConnStrs;
171172
staticintMtmQueueSize;
@@ -420,8 +421,13 @@ MtmAdjustOldestXid(TransactionId xid)
420421
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
421422
}
422423
}
423-
for (ts=Mtm->transListHead;ts!=NULL&&ts->csn<oldestSnapshot;prev=ts,ts=ts->next) {
424-
Assert(ts->status==TRANSACTION_STATUS_COMMITTED||ts->status==TRANSACTION_STATUS_ABORTED||ts->status==TRANSACTION_STATUS_IN_PROGRESS);
424+
for (ts=Mtm->transListHead;
425+
ts!=NULL
426+
&&ts->csn<oldestSnapshot
427+
&& (ts->status==TRANSACTION_STATUS_COMMITTED||ts->status==TRANSACTION_STATUS_ABORTED)
428+
&&TransactionIdPrecedes(ts->xid,xid);
429+
prev=ts,ts=ts->next)
430+
{
425431
if (prev!=NULL) {
426432
/* Remove information about too old transactions */
427433
hash_search(MtmXid2State,&prev->xid,HASH_REMOVE,NULL);
@@ -989,7 +995,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
989995
for (i=0;i<n;i++) {
990996
if (i+1!=MtmNodeId) {
991997
void*data=PaxosGet(psprintf("node-mask-%d",i+1),NULL,NULL,nowait);
992-
matrix[i]=*(nodemask_t*)data;
998+
matrix[i]=data ?*(nodemask_t*)data :0;
993999
}else {
9941000
matrix[i]=Mtm->connectivityMask;
9951001
}
@@ -1153,6 +1159,7 @@ static void MtmInitialize()
11531159
for (i=0;i<MtmNodes;i++) {
11541160
Mtm->nodes[i].oldestSnapshot=0;
11551161
Mtm->nodes[i].transDelay=0;
1162+
Mtm->nodes[i].con=MtmConnections[i];
11561163
}
11571164
PGSemaphoreCreate(&Mtm->votingSemaphore);
11581165
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1178,17 +1185,17 @@ MtmShmemStartup(void)
11781185
MtmInitialize();
11791186
}
11801187

1181-
voidMtmUpdateNodeConnStr(intnodeId,charconst*connStr)
1188+
voidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr)
11821189
{
11831190
charconst*host;
11841191
charconst*end;
11851192
inthostLen;
11861193

11871194
if (strlen(connStr) >=MULTIMASTER_MAX_CONN_STR_SIZE) {
1188-
elog(ERROR,"Too long (%d) connection string '%s' for node %d, limit is %d",
1189-
(int)strlen(connStr),connStr,nodeId,MULTIMASTER_MAX_CONN_STR_SIZE-1);
1195+
elog(ERROR,"Too long (%d) connection string '%s': limit is %d",
1196+
(int)strlen(connStr),connStr,MULTIMASTER_MAX_CONN_STR_SIZE-1);
11901197
}
1191-
strcpy(Mtm->nodes[nodeId-1].connStr,connStr);
1198+
strcpy(conn->connStr,connStr);
11921199

11931200
host=strstr(connStr,"host=");
11941201
if (host==NULL) {
@@ -1198,30 +1205,46 @@ void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
11981205
for (end=host;*end!=' '&&*end!='\0';end++);
11991206
hostLen=end-host;
12001207
if (hostLen >=MULTIMASTER_MAX_HOST_NAME_SIZE) {
1201-
elog(ERROR,"Too long (%d) host name '%.*s' for node %d, limit is %d",
1202-
hostLen,hostLen,host,nodeId,MULTIMASTER_MAX_HOST_NAME_SIZE-1);
1208+
elog(ERROR,"Too long (%d) host name '%.*s': limit is %d",
1209+
hostLen,hostLen,host,MULTIMASTER_MAX_HOST_NAME_SIZE-1);
12031210
}
1204-
memcpy(Mtm->nodes[nodeId-1].hostName,host,hostLen);
1205-
Mtm->nodes[nodeId-1].hostName[hostLen]='\0';
1211+
memcpy(conn->hostName,host,hostLen);
1212+
conn->hostName[hostLen]='\0';
12061213
}
12071214

12081215
staticvoidMtmSplitConnStrs(void)
12091216
{
12101217
inti;
1211-
char*copy=strdup(MtmConnStrs);
1218+
char*copy=pstrdup(MtmConnStrs);
12121219
char*connStr=copy;
12131220
char*connStrEnd=connStr+strlen(connStr);
12141221

1222+
for (i=0;connStr<connStrEnd;i++) {
1223+
char*p=strchr(connStr,',');
1224+
if (p==NULL) {
1225+
p=connStrEnd;
1226+
}
1227+
connStr=p+1;
1228+
}
1229+
if (i>MAX_NODES) {
1230+
elog(ERROR,"Multimaster with more than %d nodes is not currently supported",MAX_NODES);
1231+
}
1232+
if (i<2) {
1233+
elog(ERROR,"Multimaster should have at least two nodes");
1234+
}
1235+
MtmNodes=i;
1236+
MtmConnections= (MtmConnectionInfo*)palloc(i*sizeof(MtmConnectionInfo));
1237+
connStr=copy;
1238+
12151239
for (i=0;connStr<connStrEnd;i++) {
12161240
char*p=strchr(connStr,',');
12171241
if (p==NULL) {
12181242
p=connStrEnd;
12191243
}
1220-
if (i==MAX_NODES) {
1221-
elog(ERROR,"Multimaster with more than %d nodes is not currently supported",MAX_NODES);
1222-
}
12231244
*p='\0';
1224-
MtmUpdateNodeConnStr(i+1,connStr);
1245+
1246+
MtmUpdateNodeConnectionInfo(&MtmConnections[i],connStr);
1247+
12251248
if (i+1==MtmNodeId) {
12261249
char*dbName=strstr(connStr,"dbname=");
12271250
char*end;
@@ -1232,20 +1255,13 @@ static void MtmSplitConnStrs(void)
12321255
dbName+=7;
12331256
for (end=dbName;*end!=' '&&*end!='\0';end++);
12341257
len=end-dbName;
1235-
MtmDatabaseName= (char*)malloc(len+1);
1258+
MtmDatabaseName= (char*)palloc(len+1);
12361259
memcpy(MtmDatabaseName,dbName,len);
12371260
MtmDatabaseName[len]='\0';
12381261
}
12391262
connStr=p+1;
12401263
}
1241-
free(copy);
1242-
if (i<2) {
1243-
elog(ERROR,"Multimaster should have at least two nodes");
1244-
}
1245-
MtmNodes=i;
1246-
if (MtmNodeId>MtmNodes) {
1247-
elog(ERROR,"Invalid node id %d for specified nubmer of nodes %d",MtmNodeId,MtmNodes);
1248-
}
1264+
pfree(copy);
12491265
}
12501266

12511267
void

‎multimaster.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,16 @@ typedef enum
8181

8282
typedefstruct
8383
{
84+
charhostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
85+
charconnStr[MULTIMASTER_MAX_CONN_STR_SIZE];
86+
}MtmConnectionInfo;
87+
88+
89+
typedefstruct
90+
{
91+
MtmConnectionInfocon;
8492
time_ttransDelay;
8593
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
86-
charhostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
87-
charconnStr[MULTIMASTER_MAX_CONN_STR_SIZE];
8894
}MtmNodeInfo;
8995

9096
typedefstructMtmTransState
@@ -152,6 +158,8 @@ extern int MtmReconnectAttempts;
152158
externintMtmKeepaliveTimeout;
153159
externHTAB*MtmXid2State;
154160

161+
externMtmConnectionInfo*MtmConnections;
162+
155163
externvoidMtmArbiterInitialize(void);
156164
externvoidMtmStartReceivers(void);
157165
externcsn_tMtmTransactionSnapshot(TransactionIdxid);
@@ -183,6 +191,6 @@ extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
183191
externboolMtmIsRecoveredNode(intnodeId);
184192
externvoidMtmRefreshClusterStatus(boolnowait);
185193
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
186-
externvoidMtmUpdateNodeConnStr(intnodeId,charconst*connStr);
194+
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
187195

188196
#endif

‎pglogical_receiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,8 @@ void MtmStartReceivers(void)
578578

579579
for (i=0;i<MtmNodes;i++) {
580580
if (i+1!=MtmNodeId) {
581-
ReceiverArgs*ctx= (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
582-
ctx->receiver_conn_string=psprintf("replication=database %s",Mtm->nodes[i].connStr);
581+
ReceiverArgs*ctx= (ReceiverArgs*)palloc(sizeof(ReceiverArgs));
582+
ctx->receiver_conn_string=psprintf("replication=database %s",MtmConnections[i].connStr);
583583
sprintf(ctx->receiver_slot,MULTIMASTER_SLOT_PATTERN,MtmNodeId);
584584
ctx->local_node=MtmNodeId;
585585
ctx->remote_node=i+1;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp