Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5c44b4e

Browse files
committed
2 parents98f9a57 +f3c0e4e commit5c44b4e

File tree

7 files changed

+167
-54
lines changed

7 files changed

+167
-54
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -243,30 +243,34 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
243243
if (!MtmResolveHostByName(host,addrs,&n_addrs)) {
244244
elog(ERROR,"Arbiter failed to resolve host '%s' by name",host);
245245
}
246-
Retry:
247-
sd=socket(AF_INET,SOCK_STREAM,0);
248-
if (sd<0) {
249-
elog(ERROR,"Arbiter failed to create socket: %d",errno);
250-
}
246+
247+
Retry:
248+
251249
while (1) {
252250
intrc=-1;
251+
252+
sd=socket(AF_INET,SOCK_STREAM,0);
253+
if (sd<0) {
254+
elog(ERROR,"Arbiter failed to create socket: %d",errno);
255+
}
253256
for (i=0;i<n_addrs;++i) {
254257
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
255258
do {
256259
rc=connect(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet));
257260
}while (rc<0&&errno==EINTR);
258-
261+
259262
if (rc >=0||errno==EINPROGRESS) {
260263
break;
261264
}
262265
}
263266
if (rc<0) {
264267
if ((errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS)||max_attempts==0) {
265-
elog(WARNING,"Arbiter failed to connect to %s:%d: %d",host,port,errno);
268+
elog(WARNING,"Arbiter failed to connect to %s:%d:error=%d",host,port,errno);
266269
return-1;
267270
}else {
268271
max_attempts-=1;
269-
MtmSleep(MtmConnectTimeout);
272+
elog(WARNING,"Arbiter trying to connect to %s:%d: error=%d",host,port,errno);
273+
MtmSleep(5*MtmConnectTimeout);
270274
}
271275
continue;
272276
}else {
@@ -282,7 +286,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
282286
req.hdr.sxid=ShmemVariableCache->nextXid;
283287
req.hdr.csn=MtmGetCurrentTime();
284288
req.hdr.disabledNodeMask=Mtm->disabledNodeMask;
285-
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].connStr);
289+
strcpy(req.connStr,Mtm->nodes[MtmNodeId-1].con.connStr);
286290
if (!MtmWriteSocket(sd,&req,sizeofreq)) {
287291
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
288292
close(sd);
@@ -321,7 +325,7 @@ static void MtmOpenConnections()
321325

322326
for (i=0;i<nNodes;i++) {
323327
if (i+1!=MtmNodeId) {
324-
sockets[i]=MtmConnectSocket(Mtm->nodes[i].hostName,MtmArbiterPort+i+1,MtmConnectAttempts);
328+
sockets[i]=MtmConnectSocket(Mtm->nodes[i].con.hostName,MtmArbiterPort+i+1,MtmConnectAttempts);
325329
if (sockets[i]<0) {
326330
MtmOnNodeDisconnect(i+1);
327331
}
@@ -345,7 +349,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
345349
if (sockets[node] >=0) {
346350
close(sockets[node]);
347351
}
348-
sockets[node]=MtmConnectSocket(Mtm->nodes[node].hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
352+
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
349353
if (sockets[node]<0) {
350354
MtmOnNodeDisconnect(node+1);
351355
return false;
@@ -385,7 +389,7 @@ static void MtmAcceptOneConnection()
385389
resp.dxid=HANDSHAKE_MAGIC;
386390
resp.sxid=ShmemVariableCache->nextXid;
387391
resp.csn=MtmGetCurrentTime();
388-
MtmUpdateNodeConnStr(req.hdr.node,req.connStr);
392+
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con,req.connStr);
389393
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
390394
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",resp.node);
391395
close(fd);

‎contrib/mmts/multimaster.c‎

Lines changed: 125 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,12 @@ int MtmConnectAttempts;
166166
intMtmConnectTimeout;
167167
intMtmKeepaliveTimeout;
168168
intMtmReconnectAttempts;
169+
MtmConnectionInfo*MtmConnections;
169170

170171
staticchar*MtmConnStrs;
171172
staticintMtmQueueSize;
172173
staticintMtmWorkers;
174+
staticintMtmVacuumDelay;
173175
staticintMtmMinRecoveryLag;
174176
staticintMtmMaxRecoveryLag;
175177

@@ -402,26 +404,90 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
402404
* We collest oldest CSNs from all nodes and choose minimum from them.
403405
* If no such XID can be located, then return previously observed oldest XID
404406
*/
407+
#if0
405408
staticTransactionId
406409
MtmAdjustOldestXid(TransactionIdxid)
407410
{
408411
if (TransactionIdIsValid(xid)) {
409412
MtmTransState*ts,*prev=NULL;
410-
413+
csn_toldestSnapshot=0;
414+
inti;
415+
411416
MtmLock(LW_EXCLUSIVE);
412-
ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
413-
if (ts!=NULL&&ts->status==TRANSACTION_STATUS_COMMITTED) {/* committed transactions have same CSNs at all nodes */
414-
csn_toldestSnapshot;
415-
inti;
417+
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
418+
if (TransactionIdPrecedes(ts->xid,xid)
419+
&&ts->status==TRANSACTION_STATUS_COMMITTED
420+
&&ts->csn>oldestSnapshot)
421+
{
422+
oldestSnapshot=ts->csn;
423+
}
424+
}
425+
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
426+
for (i=0;i<MtmNodes;i++) {
427+
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
428+
&&Mtm->nodes[i].oldestSnapshot<oldestSnapshot)
429+
{
430+
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
431+
}
432+
}
433+
oldestSnapshot-=MtmVacuumDelay*USEC;
434+
for (ts=Mtm->transListHead;
435+
ts!=NULL
436+
&&ts->csn<oldestSnapshot
437+
&&TransactionIdPrecedes(ts->xid,xid)
438+
&& (ts->status==TRANSACTION_STATUS_COMMITTED||
439+
ts->status==TRANSACTION_STATUS_ABORTED);
440+
ts=ts->next)
441+
{
442+
if (ts->status==TRANSACTION_STATUS_COMMITTED) {
443+
prev=ts;
444+
}
445+
}
446+
if (prev!=NULL) {
447+
for (ts=Mtm->transListHead;ts!=prev;ts=ts->next) {
448+
/* Remove information about too old transactions */
449+
Assert(ts->status!=TRANSACTION_STATUS_UNKNOWN);
450+
hash_search(MtmXid2State,&ts->xid,HASH_REMOVE,NULL);
451+
}
452+
Mtm->transListHead=prev;
453+
Mtm->oldestXid=xid=prev->xid;
454+
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
455+
xid=Mtm->oldestXid;
456+
}
457+
MtmUnlock();
458+
}
459+
returnxid;
460+
}
461+
#else
462+
staticTransactionId
463+
MtmAdjustOldestXid(TransactionIdxid)
464+
{
465+
if (TransactionIdIsValid(xid)) {
466+
MtmTransState*ts,*prev=NULL;
467+
inti;
416468

417-
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot=ts->csn;
469+
MtmLock(LW_EXCLUSIVE);
470+
ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
471+
if (ts!=NULL&&ts->status==TRANSACTION_STATUS_COMMITTED) {
472+
csn_toldestSnapshot=ts->csn;
473+
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
418474
for (i=0;i<MtmNodes;i++) {
419-
if (Mtm->nodes[i].oldestSnapshot<oldestSnapshot) {
475+
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
476+
&&Mtm->nodes[i].oldestSnapshot<oldestSnapshot)
477+
{
420478
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
421479
}
422480
}
423-
for (ts=Mtm->transListHead;ts!=NULL&&ts->csn<oldestSnapshot;prev=ts,ts=ts->next) {
424-
Assert(ts->status==TRANSACTION_STATUS_COMMITTED||ts->status==TRANSACTION_STATUS_ABORTED||ts->status==TRANSACTION_STATUS_IN_PROGRESS);
481+
oldestSnapshot-=MtmVacuumDelay*USEC;
482+
483+
for (ts=Mtm->transListHead;
484+
ts!=NULL
485+
&&ts->csn<oldestSnapshot
486+
&&TransactionIdPrecedes(ts->xid,xid)
487+
&& (ts->status==TRANSACTION_STATUS_COMMITTED||
488+
ts->status==TRANSACTION_STATUS_ABORTED);
489+
prev=ts,ts=ts->next)
490+
{
425491
if (prev!=NULL) {
426492
/* Remove information about too old transactions */
427493
hash_search(MtmXid2State,&prev->xid,HASH_REMOVE,NULL);
@@ -431,14 +497,14 @@ MtmAdjustOldestXid(TransactionId xid)
431497
if (prev!=NULL) {
432498
Mtm->transListHead=prev;
433499
Mtm->oldestXid=xid=prev->xid;
434-
}else {
500+
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)){
435501
xid=Mtm->oldestXid;
436502
}
437503
MtmUnlock();
438504
}
439505
returnxid;
440506
}
441-
507+
#endif
442508
/*
443509
* -------------------------------------------
444510
* Transaction list manipulation
@@ -989,7 +1055,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
9891055
for (i=0;i<n;i++) {
9901056
if (i+1!=MtmNodeId) {
9911057
void*data=PaxosGet(psprintf("node-mask-%d",i+1),NULL,NULL,nowait);
992-
matrix[i]=*(nodemask_t*)data;
1058+
matrix[i]=data ?*(nodemask_t*)data :0;
9931059
}else {
9941060
matrix[i]=Mtm->connectivityMask;
9951061
}
@@ -1153,6 +1219,7 @@ static void MtmInitialize()
11531219
for (i=0;i<MtmNodes;i++) {
11541220
Mtm->nodes[i].oldestSnapshot=0;
11551221
Mtm->nodes[i].transDelay=0;
1222+
Mtm->nodes[i].con=MtmConnections[i];
11561223
}
11571224
PGSemaphoreCreate(&Mtm->votingSemaphore);
11581225
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1178,17 +1245,17 @@ MtmShmemStartup(void)
11781245
MtmInitialize();
11791246
}
11801247

1181-
voidMtmUpdateNodeConnStr(intnodeId,charconst*connStr)
1248+
voidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr)
11821249
{
11831250
charconst*host;
11841251
charconst*end;
11851252
inthostLen;
11861253

11871254
if (strlen(connStr) >=MULTIMASTER_MAX_CONN_STR_SIZE) {
1188-
elog(ERROR,"Too long (%d) connection string '%s' for node %d, limit is %d",
1189-
(int)strlen(connStr),connStr,nodeId,MULTIMASTER_MAX_CONN_STR_SIZE-1);
1255+
elog(ERROR,"Too long (%d) connection string '%s': limit is %d",
1256+
(int)strlen(connStr),connStr,MULTIMASTER_MAX_CONN_STR_SIZE-1);
11901257
}
1191-
strcpy(Mtm->nodes[nodeId-1].connStr,connStr);
1258+
strcpy(conn->connStr,connStr);
11921259

11931260
host=strstr(connStr,"host=");
11941261
if (host==NULL) {
@@ -1198,30 +1265,46 @@ void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
11981265
for (end=host;*end!=' '&&*end!='\0';end++);
11991266
hostLen=end-host;
12001267
if (hostLen >=MULTIMASTER_MAX_HOST_NAME_SIZE) {
1201-
elog(ERROR,"Too long (%d) host name '%.*s' for node %d, limit is %d",
1202-
hostLen,hostLen,host,nodeId,MULTIMASTER_MAX_HOST_NAME_SIZE-1);
1268+
elog(ERROR,"Too long (%d) host name '%.*s': limit is %d",
1269+
hostLen,hostLen,host,MULTIMASTER_MAX_HOST_NAME_SIZE-1);
12031270
}
1204-
memcpy(Mtm->nodes[nodeId-1].hostName,host,hostLen);
1205-
Mtm->nodes[nodeId-1].hostName[hostLen]='\0';
1271+
memcpy(conn->hostName,host,hostLen);
1272+
conn->hostName[hostLen]='\0';
12061273
}
12071274

12081275
staticvoidMtmSplitConnStrs(void)
12091276
{
12101277
inti;
1211-
char*copy=strdup(MtmConnStrs);
1278+
char*copy=pstrdup(MtmConnStrs);
12121279
char*connStr=copy;
12131280
char*connStrEnd=connStr+strlen(connStr);
12141281

1282+
for (i=0;connStr<connStrEnd;i++) {
1283+
char*p=strchr(connStr,',');
1284+
if (p==NULL) {
1285+
p=connStrEnd;
1286+
}
1287+
connStr=p+1;
1288+
}
1289+
if (i>MAX_NODES) {
1290+
elog(ERROR,"Multimaster with more than %d nodes is not currently supported",MAX_NODES);
1291+
}
1292+
if (i<2) {
1293+
elog(ERROR,"Multimaster should have at least two nodes");
1294+
}
1295+
MtmNodes=i;
1296+
MtmConnections= (MtmConnectionInfo*)palloc(i*sizeof(MtmConnectionInfo));
1297+
connStr=copy;
1298+
12151299
for (i=0;connStr<connStrEnd;i++) {
12161300
char*p=strchr(connStr,',');
12171301
if (p==NULL) {
12181302
p=connStrEnd;
12191303
}
1220-
if (i==MAX_NODES) {
1221-
elog(ERROR,"Multimaster with more than %d nodes is not currently supported",MAX_NODES);
1222-
}
12231304
*p='\0';
1224-
MtmUpdateNodeConnStr(i+1,connStr);
1305+
1306+
MtmUpdateNodeConnectionInfo(&MtmConnections[i],connStr);
1307+
12251308
if (i+1==MtmNodeId) {
12261309
char*dbName=strstr(connStr,"dbname=");
12271310
char*end;
@@ -1232,20 +1315,13 @@ static void MtmSplitConnStrs(void)
12321315
dbName+=7;
12331316
for (end=dbName;*end!=' '&&*end!='\0';end++);
12341317
len=end-dbName;
1235-
MtmDatabaseName= (char*)malloc(len+1);
1318+
MtmDatabaseName= (char*)palloc(len+1);
12361319
memcpy(MtmDatabaseName,dbName,len);
12371320
MtmDatabaseName[len]='\0';
12381321
}
12391322
connStr=p+1;
12401323
}
1241-
free(copy);
1242-
if (i<2) {
1243-
elog(ERROR,"Multimaster should have at least two nodes");
1244-
}
1245-
MtmNodes=i;
1246-
if (MtmNodeId>MtmNodes) {
1247-
elog(ERROR,"Invalid node id %d for specified nubmer of nodes %d",MtmNodeId,MtmNodes);
1248-
}
1324+
pfree(copy);
12491325
}
12501326

12511327
void
@@ -1309,6 +1385,21 @@ _PG_init(void)
13091385
NULL
13101386
);
13111387

1388+
DefineCustomIntVariable(
1389+
"multimaster.vacuum_delay",
1390+
"Minimal age of records which can be vacuumed (seconds)",
1391+
NULL,
1392+
&MtmVacuumDelay,
1393+
1,
1394+
1,
1395+
INT_MAX,
1396+
PGC_BACKEND,
1397+
0,
1398+
NULL,
1399+
NULL,
1400+
NULL
1401+
);
1402+
13121403
DefineCustomIntVariable(
13131404
"multimaster.queue_size",
13141405
"Multimaster queue size",

‎contrib/mmts/multimaster.h‎

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,16 @@ typedef enum
8181

8282
typedefstruct
8383
{
84+
charhostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
85+
charconnStr[MULTIMASTER_MAX_CONN_STR_SIZE];
86+
}MtmConnectionInfo;
87+
88+
89+
typedefstruct
90+
{
91+
MtmConnectionInfocon;
8492
time_ttransDelay;
8593
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
86-
charhostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
87-
charconnStr[MULTIMASTER_MAX_CONN_STR_SIZE];
8894
}MtmNodeInfo;
8995

9096
typedefstructMtmTransState
@@ -152,6 +158,8 @@ extern int MtmReconnectAttempts;
152158
externintMtmKeepaliveTimeout;
153159
externHTAB*MtmXid2State;
154160

161+
externMtmConnectionInfo*MtmConnections;
162+
155163
externvoidMtmArbiterInitialize(void);
156164
externvoidMtmStartReceivers(void);
157165
externcsn_tMtmTransactionSnapshot(TransactionIdxid);
@@ -183,6 +191,6 @@ extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
183191
externboolMtmIsRecoveredNode(intnodeId);
184192
externvoidMtmRefreshClusterStatus(boolnowait);
185193
externvoidMtmSwitchClusterMode(MtmNodeStatusmode);
186-
externvoidMtmUpdateNodeConnStr(intnodeId,charconst*connStr);
194+
externvoidMtmUpdateNodeConnectionInfo(MtmConnectionInfo*conn,charconst*connStr);
187195

188196
#endif

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,8 @@ void MtmStartReceivers(void)
578578

579579
for (i=0;i<MtmNodes;i++) {
580580
if (i+1!=MtmNodeId) {
581-
ReceiverArgs*ctx= (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
582-
ctx->receiver_conn_string=psprintf("replication=database %s",Mtm->nodes[i].connStr);
581+
ReceiverArgs*ctx= (ReceiverArgs*)palloc(sizeof(ReceiverArgs));
582+
ctx->receiver_conn_string=psprintf("replication=database %s",MtmConnections[i].connStr);
583583
sprintf(ctx->receiver_slot,MULTIMASTER_SLOT_PATTERN,MtmNodeId);
584584
ctx->local_node=MtmNodeId;
585585
ctx->remote_node=i+1;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp