Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2b6fbf9

Browse files
committed
Avoid writes in critical section
1 parent3ebf193 commit2b6fbf9

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@
7373

7474
#include"multimaster.h"
7575

76-
#defineMAX_ROUTES 16
77-
#defineBUFFER_SIZE 1024
78-
#defineHANDSHAKE_MAGIC 0xCAFEDEED
76+
#defineMAX_ROUTES16
77+
#defineINIT_BUFFER_SIZE 1024
78+
#defineHANDSHAKE_MAGIC0xCAFEDEED
7979

8080
typedefstruct
8181
{
@@ -97,7 +97,8 @@ typedef struct
9797
typedefstruct
9898
{
9999
intused;
100-
MtmArbiterMessagedata[BUFFER_SIZE];
100+
intsize;
101+
MtmArbiterMessage*data;
101102
}MtmBuffer;
102103

103104
staticint*sockets;
@@ -448,11 +449,14 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
448449
}
449450

450451
/* Some node considered that I am dead, so switch to recovery mode */
452+
MtmLock(LW_EXCLUSIVE);
451453
if (BIT_CHECK(resp.disabledNodeMask,MtmNodeId-1)) {
452454
elog(WARNING,"Node %d thinks that I was dead",resp.node);
453455
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
454456
MtmSwitchClusterMode(MTM_RECOVERY);
455457
}
458+
MtmUnlock();
459+
456460
returnsd;
457461
}
458462

@@ -489,7 +493,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
489493
while (true) {
490494
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
491495
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
496+
MtmLock(LW_EXCLUSIVE);
492497
BIT_CLEAR(Mtm->reconnectMask,node);
498+
MtmUnlock();
493499
close(sockets[node]);
494500
sockets[node]=-1;
495501
}
@@ -504,7 +510,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
504510
MtmOnNodeDisconnect(node+1);
505511
return false;
506512
}
513+
MtmLock(LW_EXCLUSIVE);
507514
BIT_CLEAR(Mtm->reconnectMask,node);
515+
MtmUnlock();
508516
MTM_LOG3("Arbiter restablished connection with node %d",node+1);
509517
}else {
510518
return true;
@@ -550,7 +558,6 @@ static void MtmAcceptOneConnection()
550558
close(fd);
551559
}else {
552560
MTM_LOG1("Arbiter established connection with node %d",req.hdr.node);
553-
BIT_CLEAR(Mtm->connectivityMask,req.hdr.node-1);
554561
MtmRegisterSocket(fd,req.hdr.node-1);
555562
sockets[req.hdr.node-1]=fd;
556563
MtmOnNodeConnect(req.hdr.node);
@@ -596,12 +603,9 @@ static void MtmAcceptIncomingConnections()
596603
staticvoidMtmAppendBuffer(MtmBuffer*txBuffer,TransactionIdxid,intnode,MtmTransState*ts)
597604
{
598605
MtmBuffer*buf=&txBuffer[node];
599-
if (buf->used==BUFFER_SIZE) {
600-
if (!MtmSendToNode(node,buf->data,buf->used*sizeof(MtmArbiterMessage))) {
601-
buf->used=0;
602-
return;
603-
}
604-
buf->used=0;
606+
if (buf->used==buf->size) {
607+
buf->size=buf->size ?buf->size*2 :INIT_BUFFER_SIZE;
608+
buf->data=repalloc(buf->data,buf->size*sizeof(MtmArbiterMessage));
605609
}
606610
buf->data[buf->used].dxid=xid;
607611

@@ -638,7 +642,7 @@ static void MtmTransSender(Datum arg)
638642
intnNodes=MtmMaxNodes;
639643
inti;
640644

641-
MtmBuffer*txBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
645+
MtmBuffer*txBuffer= (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
642646

643647
InitializeTimeouts();
644648

@@ -653,10 +657,6 @@ static void MtmTransSender(Datum arg)
653657

654658
MtmOpenConnections();
655659

656-
for (i=0;i<nNodes;i++) {
657-
txBuffer[i].used=0;
658-
}
659-
660660
while (!stop) {
661661
MtmTransState*ts;
662662
PGSemaphoreLock(&Mtm->votingSemaphore);
@@ -721,7 +721,7 @@ static void MtmTransReceiver(Datum arg)
721721
intnNodes=MtmMaxNodes;
722722
intnResponses;
723723
inti,j,n,rc;
724-
MtmBuffer*rxBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
724+
MtmBuffer*rxBuffer= (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
725725
timestamp_tlastHeartbeatCheck=MtmGetSystemTime();
726726
timestamp_tnow;
727727

@@ -742,7 +742,8 @@ static void MtmTransReceiver(Datum arg)
742742
MtmAcceptIncomingConnections();
743743

744744
for (i=0;i<nNodes;i++) {
745-
rxBuffer[i].used=0;
745+
rxBuffer[i].size=INIT_BUFFER_SIZE;
746+
rxBuffer[i].data=palloc(INIT_BUFFER_SIZE*sizeof(MtmArbiterMessage));
746747
}
747748

748749
while (!stop) {
@@ -786,7 +787,7 @@ static void MtmTransReceiver(Datum arg)
786787
continue;
787788
}
788789

789-
rc=MtmReadFromNode(i, (char*)rxBuffer[i].data+rxBuffer[i].used,BUFFER_SIZE-rxBuffer[i].used);
790+
rc=MtmReadFromNode(i, (char*)rxBuffer[i].data+rxBuffer[i].used,rxBuffer[i].size-rxBuffer[i].used);
790791
if (rc <=0) {
791792
continue;
792793
}

‎contrib/mmts/multimaster.c‎

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1433,8 +1433,11 @@ void MtmOnNodeDisconnect(int nodeId)
14331433
/* Avoid false detection of node failure and prevent node status blinking */
14341434
return;
14351435
}
1436+
MtmLock(LW_EXCLUSIVE);
14361437
BIT_SET(Mtm->connectivityMask,nodeId-1);
14371438
BIT_SET(Mtm->reconnectMask,nodeId-1);
1439+
MtmUnlock();
1440+
14381441
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
14391442

14401443
MtmSleep(MSEC_TO_USEC(MtmRaftPollDelay));
@@ -1461,7 +1464,10 @@ void MtmOnNodeDisconnect(int nodeId)
14611464

14621465
voidMtmOnNodeConnect(intnodeId)
14631466
{
1467+
MtmLock(LW_EXCLUSIVE);
14641468
BIT_CLEAR(Mtm->connectivityMask,nodeId-1);
1469+
MtmUnlock();
1470+
14651471
MTM_LOG1("Reconnect node %d",nodeId);
14661472
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
14671473
}
@@ -2195,6 +2201,8 @@ void MtmRecoverNode(int nodeId)
21952201

21962202
voidMtmDropNode(intnodeId,booldropSlot)
21972203
{
2204+
MtmLock(LW_EXCLUSIVE);
2205+
21982206
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
21992207
{
22002208
if (nodeId <=0||nodeId>Mtm->nLiveNodes)
@@ -2212,6 +2220,8 @@ void MtmDropNode(int nodeId, bool dropSlot)
22122220
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN,nodeId));
22132221
}
22142222
}
2223+
2224+
MtmUnlock();
22152225
}
22162226
staticvoid
22172227
MtmOnProcExit(intcode,Datumarg)
@@ -2402,7 +2412,9 @@ mtm_add_node(PG_FUNCTION_ARGS)
24022412
}
24032413
else
24042414
{
2405-
intnodeId=Mtm->nAllNodes;
2415+
intnodeId;
2416+
MtmLock(LW_EXCLUSIVE);
2417+
nodeId=Mtm->nAllNodes;
24062418
elog(NOTICE,"Add node %d: '%s'",nodeId+1,connStr);
24072419
MtmUpdateNodeConnectionInfo(&Mtm->nodes[nodeId].con,connStr);
24082420
Mtm->nodes[nodeId].transDelay=0;
@@ -2412,6 +2424,8 @@ mtm_add_node(PG_FUNCTION_ARGS)
24122424

24132425
BIT_SET(Mtm->disabledNodeMask,nodeId);
24142426
Mtm->nAllNodes+=1;
2427+
MtmUnlock();
2428+
24152429
MtmStartReceiver(nodeId+1, true);
24162430
}
24172431
PG_RETURN_VOID();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp