Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfa309d4

Browse files
knizhnikkelvich
authored andcommitted
Avoid writes in critical section
1 parent5f35060 commitfa309d4

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

‎arbiter.c

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@
7474

7575
#include"multimaster.h"
7676

77-
#defineMAX_ROUTES 16
78-
#defineBUFFER_SIZE 1024
79-
#defineHANDSHAKE_MAGIC 0xCAFEDEED
77+
#defineMAX_ROUTES16
78+
#defineINIT_BUFFER_SIZE 1024
79+
#defineHANDSHAKE_MAGIC0xCAFEDEED
8080

8181
typedefstruct
8282
{
@@ -98,7 +98,8 @@ typedef struct
9898
typedefstruct
9999
{
100100
intused;
101-
MtmArbiterMessagedata[BUFFER_SIZE];
101+
intsize;
102+
MtmArbiterMessage*data;
102103
}MtmBuffer;
103104

104105
staticint*sockets;
@@ -450,11 +451,14 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
450451
}
451452

452453
/* Some node considered that I am dead, so switch to recovery mode */
454+
MtmLock(LW_EXCLUSIVE);
453455
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
454456
elog(WARNING,"Node %d thinks that I was dead",resp.node);
455457
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
456458
MtmSwitchClusterMode(MTM_RECOVERY);
457459
}
460+
MtmUnlock();
461+
458462
returnsd;
459463
}
460464

@@ -491,7 +495,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
491495
while (true) {
492496
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
493497
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
498+
MtmLock(LW_EXCLUSIVE);
494499
BIT_CLEAR(Mtm->reconnectMask,node);
500+
MtmUnlock();
495501
close(sockets[node]);
496502
sockets[node]=-1;
497503
}
@@ -506,7 +512,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
506512
MtmOnNodeDisconnect(node+1);
507513
return false;
508514
}
515+
MtmLock(LW_EXCLUSIVE);
509516
BIT_CLEAR(Mtm->reconnectMask,node);
517+
MtmUnlock();
510518
MTM_LOG3("Arbiter restablished connection with node %d",node+1);
511519
}else {
512520
return true;
@@ -552,7 +560,6 @@ static void MtmAcceptOneConnection()
552560
close(fd);
553561
}else {
554562
MTM_LOG1("Arbiter established connection with node %d",req.hdr.node);
555-
BIT_CLEAR(Mtm->connectivityMask,req.hdr.node-1);
556563
MtmRegisterSocket(fd,req.hdr.node-1);
557564
sockets[req.hdr.node-1]=fd;
558565
MtmOnNodeConnect(req.hdr.node);
@@ -598,12 +605,9 @@ static void MtmAcceptIncomingConnections()
598605
staticvoidMtmAppendBuffer(MtmBuffer*txBuffer,TransactionIdxid,intnode,MtmTransState*ts)
599606
{
600607
MtmBuffer*buf=&txBuffer[node];
601-
if (buf->used==BUFFER_SIZE) {
602-
if (!MtmSendToNode(node,buf->data,buf->used*sizeof(MtmArbiterMessage))) {
603-
buf->used=0;
604-
return;
605-
}
606-
buf->used=0;
608+
if (buf->used==buf->size) {
609+
buf->size=buf->size ?buf->size*2 :INIT_BUFFER_SIZE;
610+
buf->data=repalloc(buf->data,buf->size*sizeof(MtmArbiterMessage));
607611
}
608612
buf->data[buf->used].dxid=xid;
609613

@@ -640,7 +644,7 @@ static void MtmTransSender(Datum arg)
640644
intnNodes=MtmMaxNodes;
641645
inti;
642646

643-
MtmBuffer*txBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
647+
MtmBuffer*txBuffer= (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
644648

645649
InitializeTimeouts();
646650

@@ -655,10 +659,6 @@ static void MtmTransSender(Datum arg)
655659

656660
MtmOpenConnections();
657661

658-
for (i=0;i<nNodes;i++) {
659-
txBuffer[i].used=0;
660-
}
661-
662662
while (!stop) {
663663
MtmTransState*ts;
664664
PGSemaphoreLock(&Mtm->votingSemaphore);
@@ -723,7 +723,7 @@ static void MtmTransReceiver(Datum arg)
723723
intnNodes=MtmMaxNodes;
724724
intnResponses;
725725
inti,j,n,rc;
726-
MtmBuffer*rxBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
726+
MtmBuffer*rxBuffer= (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
727727
timestamp_tlastHeartbeatCheck=MtmGetSystemTime();
728728
timestamp_tnow;
729729

@@ -744,7 +744,8 @@ static void MtmTransReceiver(Datum arg)
744744
MtmAcceptIncomingConnections();
745745

746746
for (i=0;i<nNodes;i++) {
747-
rxBuffer[i].used=0;
747+
rxBuffer[i].size=INIT_BUFFER_SIZE;
748+
rxBuffer[i].data=palloc(INIT_BUFFER_SIZE*sizeof(MtmArbiterMessage));
748749
}
749750

750751
while (!stop) {
@@ -788,7 +789,7 @@ static void MtmTransReceiver(Datum arg)
788789
continue;
789790
}
790791

791-
rc=MtmReadFromNode(i, (char*)rxBuffer[i].data+rxBuffer[i].used,BUFFER_SIZE-rxBuffer[i].used);
792+
rc=MtmReadFromNode(i, (char*)rxBuffer[i].data+rxBuffer[i].used,rxBuffer[i].size-rxBuffer[i].used);
792793
if (rc <=0) {
793794
continue;
794795
}

‎multimaster.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1432,8 +1432,11 @@ void MtmOnNodeDisconnect(int nodeId)
14321432
/* Avoid false detection of node failure and prevent node status blinking */
14331433
return;
14341434
}
1435+
MtmLock(LW_EXCLUSIVE);
14351436
BIT_SET(Mtm->connectivityMask,nodeId-1);
14361437
BIT_SET(Mtm->reconnectMask,nodeId-1);
1438+
MtmUnlock();
1439+
14371440
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
14381441

14391442
MtmSleep(MSEC_TO_USEC(MtmRaftPollDelay));
@@ -1460,7 +1463,10 @@ void MtmOnNodeDisconnect(int nodeId)
14601463

14611464
voidMtmOnNodeConnect(intnodeId)
14621465
{
1466+
MtmLock(LW_EXCLUSIVE);
14631467
BIT_CLEAR(Mtm->connectivityMask,nodeId-1);
1468+
MtmUnlock();
1469+
14641470
MTM_LOG1("Reconnect node %d",nodeId);
14651471
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
14661472
}
@@ -2194,6 +2200,8 @@ void MtmRecoverNode(int nodeId)
21942200

21952201
voidMtmDropNode(intnodeId,booldropSlot)
21962202
{
2203+
MtmLock(LW_EXCLUSIVE);
2204+
21972205
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
21982206
{
21992207
if (nodeId <=0||nodeId>Mtm->nLiveNodes)
@@ -2211,6 +2219,8 @@ void MtmDropNode(int nodeId, bool dropSlot)
22112219
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN,nodeId));
22122220
}
22132221
}
2222+
2223+
MtmUnlock();
22142224
}
22152225
staticvoid
22162226
MtmOnProcExit(intcode,Datumarg)
@@ -2401,7 +2411,9 @@ mtm_add_node(PG_FUNCTION_ARGS)
24012411
}
24022412
else
24032413
{
2404-
intnodeId=Mtm->nAllNodes;
2414+
intnodeId;
2415+
MtmLock(LW_EXCLUSIVE);
2416+
nodeId=Mtm->nAllNodes;
24052417
elog(NOTICE,"Add node %d: '%s'",nodeId+1,connStr);
24062418
MtmUpdateNodeConnectionInfo(&Mtm->nodes[nodeId].con,connStr);
24072419
Mtm->nodes[nodeId].transDelay=0;
@@ -2411,6 +2423,8 @@ mtm_add_node(PG_FUNCTION_ARGS)
24112423

24122424
BIT_SET(Mtm->disabledNodeMask,nodeId);
24132425
Mtm->nAllNodes+=1;
2426+
MtmUnlock();
2427+
24142428
MtmStartReceiver(nodeId+1, true);
24152429
}
24162430
PG_RETURN_VOID();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp