Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9dff142

Browse files
knizhnikkelvich
authored andcommitted
Eliminate dupplicated messages
1 parent8eeb49f commit9dff142

File tree

5 files changed

+31
-10
lines changed

5 files changed

+31
-10
lines changed

‎Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ USER postgres
3131
ENV CFLAGS -O0
3232
WORKDIR /pg
3333

34-
ENV REBUILD1
34+
ENV REBUILD2
3535

3636
RUN cd /pg && \
3737
git clone https://github.com/postgrespro/postgres_cluster.git --depth 1 && \

‎arbiter.c

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,13 @@
8181
typedefstruct
8282
{
8383
MtmMessageCodecode;/* Message code: MSG_READY, MSG_PREPARE, MSG_COMMIT, MSG_ABORT */
84-
intnode;/* Sender node ID */
84+
intnode;/* Sender node ID */
8585
TransactionIddxid;/* Transaction ID at destination node */
8686
TransactionIdsxid;/* Transaction ID at sender node */
8787
csn_tcsn;/* Local CSN in case of sending data from replica to master, global CSN master->replica */
8888
nodemask_tdisabledNodeMask;/* Bitmask of disabled nodes at the sender of message */
8989
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
90+
uint64seqno;/* Message sequence number (used to eliminate duplicated messages) */
9091
}MtmArbiterMessage;
9192

9293
typedefstruct
@@ -112,6 +113,7 @@ static int busy_socket;
112113
staticvoidMtmTransSender(Datumarg);
113114
staticvoidMtmTransReceiver(Datumarg);
114115
staticvoidMtmSendHeartbeat(void);
116+
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
115117

116118

117119
staticcharconst*constmessageText[]=
@@ -248,6 +250,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
248250
if (rc==1) {
249251
intn=send(sd,src,size,0);
250252
if (n<0) {
253+
Assert(errno!=EINTR);/* should not happen in non-blocking call */
251254
busy_socket=-1;
252255
return false;
253256
}
@@ -266,6 +269,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
266269
{
267270
intrc=recv(sd,buf,buf_size,0);
268271
if (rc <=0) {
272+
Assert(errno!=EINTR);/* should not happen in non-blocking call */
269273
return-1;
270274
}
271275
returnrc;
@@ -346,9 +350,8 @@ static void MtmSendHeartbeat()
346350
{
347351
if (sockets[i] >=0&&sockets[i]!=busy_socket&& !BIT_CHECK(Mtm->disabledNodeMask|Mtm->reconnectMask,i))
348352
{
349-
size_trc=send(sockets[i],&msg,sizeof(msg),0);
350-
if ((size_t)rc!=sizeof(msg)) {
351-
elog(LOG,"Failed to send heartbeat to node %d: %d",i+1,errno);
353+
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
354+
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
352355
}
353356
}
354357
}
@@ -629,6 +632,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
629632
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
630633
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
631634
Assert(ts->cmd!=MSG_INVALID);
635+
buf->data[buf->used].seqno=++Mtm->nodes[node].sendSeqNo;
632636
buf->data[buf->used].code=ts->cmd;
633637
buf->data[buf->used].sxid=ts->xid;
634638
buf->data[buf->used].csn=ts->csn;
@@ -845,10 +849,17 @@ static void MtmTransReceiver(Datum arg)
845849
elog(WARNING,"Ignore message from dead node %d\n",msg->node);
846850
continue;
847851
}
852+
if (msg->seqno <=Mtm->nodes[msg->node-1].recvSeqNo) {
853+
elog(WARNING,"Ignore duplicated message %ld from node %d",msg->seqno,msg->node);
854+
continue;
855+
}
856+
Mtm->nodes[msg->node-1].recvSeqNo=msg->seqno;
848857

849858
ts= (MtmTransState*)hash_search(MtmXid2State,&msg->dxid,HASH_FIND,NULL);
850-
Assert(ts!=NULL);
851-
859+
if (ts==NULL) {
860+
elog(WARNING,"Ignore response for unexisted transaction %d from node %d",msg->dxid,msg->node);
861+
continue;
862+
}
852863
if (BIT_CHECK(msg->disabledNodeMask,MtmNodeId-1)&&Mtm->status!=MTM_RECOVERY) {
853864
elog(PANIC,"Node %d thinks that I was dead: perform hara-kiri not to be a zombie",msg->node);
854865
}

‎multimaster.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
814814
MtmTransState*ts;
815815
MTM_TXTRACE(x,"PostPrepareTransaction Start");
816816

817+
if (!x->isDistributed) {
818+
return;
819+
}
820+
817821
if (Mtm->inject2PCError==2) {
818822
Mtm->inject2PCError=0;
819823
elog(ERROR,"ERROR INJECTION for transaction %d (%s)",x->xid,x->gid);
@@ -1662,6 +1666,8 @@ static void MtmInitialize()
16621666
Mtm->nodes[i].con=MtmConnections[i];
16631667
Mtm->nodes[i].flushPos=0;
16641668
Mtm->nodes[i].lastHeartbeat=0;
1669+
Mtm->nodes[i].sendSeqNo=0;
1670+
Mtm->nodes[i].recvSeqNo=0;
16651671
}
16661672
PGSemaphoreCreate(&Mtm->votingSemaphore);
16671673
PGSemaphoreReset(&Mtm->votingSemaphore);

‎multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ typedef struct
136136
intsenderPid;
137137
intreceiverPid;
138138
XLogRecPtrflushPos;
139-
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
139+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
140+
uint64sendSeqNo;
141+
uint64recvSeqNo;
140142
}MtmNodeInfo;
141143

142144
typedefstructMtmTransState

‎pglogical_receiver.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include"spill.h"
4141

4242
#defineERRCODE_DUPLICATE_OBJECT_STR "42710"
43+
#defineRECEIVER_SUSPEND_TIMEOUT (10*USECS_PER_SEC)
4344

4445
/* Signal handling */
4546
staticvolatilesig_atomic_tgot_sigterm= false;
@@ -355,8 +356,9 @@ pglogical_receiver_main(Datum main_arg)
355356
proc_exit(1);
356357

357358
if (Mtm->status==MTM_OFFLINE|| (Mtm->status==MTM_RECOVERY&&Mtm->recoverySlot!=nodeId)) {
358-
ereport(LOG, (errmsg("%s: terminating WAL receiver because node was switched to %s mode",worker_proc,MtmNodeStatusMnem[Mtm->status])));
359-
proc_exit(0);
359+
ereport(LOG, (errmsg("%s: suspending WAL receiver because node was switched to %s mode",worker_proc,MtmNodeStatusMnem[Mtm->status])));
360+
MtmSleep(RECEIVER_SUSPEND_TIMEOUT);
361+
continue;
360362
}
361363

362364

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp