Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2afd9cc

Browse files
knizhnikkelvich
authored andcommitted
Recovery fixes
1 parent5bd20f8 commit2afd9cc

File tree

6 files changed

+87
-49
lines changed

6 files changed

+87
-49
lines changed

‎arbiter.c

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,13 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
247247
while (size!=0) {
248248
intrc=MtmWaitSocket(sd, true,MtmHeartbeatSendTimeout);
249249
if (rc==1) {
250-
intn=send(sd,src,size,0);
251-
if (n<0) {
252-
Assert(errno!=EINTR);/* should not happen in non-blocking call */
250+
while ((rc=send(sd,src,size,0))<0&&errno==EINTR);
251+
if (rc<0) {
253252
busy_socket=-1;
254253
return false;
255254
}
256-
size-=n;
257-
src+=n;
255+
size-=rc;
256+
src+=rc;
258257
}elseif (rc<0) {
259258
busy_socket=-1;
260259
return false;
@@ -266,15 +265,12 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266265

267266
staticintMtmReadSocket(intsd,void*buf,intbuf_size)
268267
{
269-
intrc=recv(sd,buf,buf_size,0);
268+
intrc;
269+
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
270270
if (rc<0&&errno==EAGAIN) {
271271
rc=MtmWaitSocket(sd, false,MtmHeartbeatSendTimeout);
272272
if (rc==1) {
273-
rc=recv(sd,buf,buf_size,0);
274-
if (rc<0) {
275-
Assert(errno!=EINTR);/* should not happen in non-blocking call */
276-
return-1;
277-
}
273+
while ((rc=recv(sd,buf,buf_size,0))<0&&errno==EINTR);
278274
}else {
279275
return0;
280276
}
@@ -370,12 +366,13 @@ static void MtmSendHeartbeat()
370366
for (i=0;i<Mtm->nAllNodes;i++)
371367
{
372368
if (i+1!=MtmNodeId&&sockets[i]!=busy_socket
373-
&& ((sockets[i] >=0&& !BIT_CHECK(Mtm->disabledNodeMask,i))||BIT_CHECK(Mtm->reconnectMask,i)))
369+
&& (Mtm->status!=MTM_ONLINE
370+
|| (sockets[i] >=0&& !BIT_CHECK(Mtm->disabledNodeMask,i)&& !BIT_CHECK(Mtm->reconnectMask,i))))
374371
{
375372
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
376373
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
377374
}else {
378-
MTM_LOG1("Send heartbeat to node %d with timestamp %ld",i+1,now);
375+
MTM_LOG2("Send heartbeat to node %d with timestamp %ld",i+1,now);
379376
}
380377
}
381378
}
@@ -593,8 +590,9 @@ static void MtmAcceptOneConnection()
593590
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
594591
elog(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
595592
close(fd);
596-
}else{
597-
Assert(req.hdr.node>0&&req.hdr.node <=Mtm->nAllNodes&&req.hdr.node!=MtmNodeId);
593+
}else {
594+
intnode=req.hdr.node-1;
595+
Assert(node >=0&&node<Mtm->nAllNodes&&node+1!=MtmNodeId);
598596

599597
MtmLock(LW_EXCLUSIVE);
600598
MtmCheckResponse(&req.hdr);
@@ -606,15 +604,18 @@ static void MtmAcceptOneConnection()
606604
resp.sxid=ShmemVariableCache->nextXid;
607605
resp.csn=MtmGetCurrentTime();
608606
resp.node=MtmNodeId;
609-
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con,req.connStr);
607+
MtmUpdateNodeConnectionInfo(&Mtm->nodes[node].con,req.connStr);
610608
if (!MtmWriteSocket(fd,&resp,sizeofresp)) {
611-
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",resp.node);
609+
elog(WARNING,"Arbiter failed to write response for handshake message to node %d",node+1);
612610
close(fd);
613611
}else {
614-
MTM_LOG1("Arbiter established connection with node %d",req.hdr.node);
615-
MtmRegisterSocket(fd,req.hdr.node-1);
616-
sockets[req.hdr.node-1]=fd;
617-
MtmOnNodeConnect(req.hdr.node);
612+
MTM_LOG1("Arbiter established connection with node %d",node+1);
613+
if (sockets[node] >=0) {
614+
MtmUnregisterSocket(sockets[node]);
615+
}
616+
sockets[node]=fd;
617+
MtmRegisterSocket(fd,node);
618+
MtmOnNodeConnect(node+1);
618619
}
619620
}
620621
}
@@ -889,7 +890,7 @@ static void MtmTransReceiver(Datum arg)
889890
Mtm->nodes[msg->node-1].lastHeartbeat=MtmGetSystemTime();
890891

891892
if (msg->code==MSG_HEARTBEAT) {
892-
MTM_LOG1("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893+
MTM_LOG2("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893894
msg->node,msg->csn,USEC_TO_MSEC(MtmGetSystemTime()-msg->csn));
894895
continue;
895896
}
@@ -1002,21 +1003,23 @@ static void MtmTransReceiver(Datum arg)
10021003
}
10031004
}
10041005
}
1005-
now=MtmGetSystemTime();
1006-
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1007-
if (!MtmWatchdog(stopPolling)) {
1008-
for (i=0;i<nNodes;i++) {
1009-
if (Mtm->nodes[i].lastHeartbeat!=0&&sockets[i] >=0) {
1010-
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago",i+1,now-Mtm->nodes[i].lastHeartbeat);
1006+
if (Mtm->status!=MTM_RECOVERY) {
1007+
now=MtmGetSystemTime();
1008+
if (now>lastHeartbeatCheck+MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1009+
if (!MtmWatchdog(stopPolling)) {
1010+
for (i=0;i<nNodes;i++) {
1011+
if (Mtm->nodes[i].lastHeartbeat!=0&&sockets[i] >=0) {
1012+
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago",i+1,now-Mtm->nodes[i].lastHeartbeat);
1013+
}
10111014
}
1015+
MTM_LOG1("epoll started %ld and finished %ld microseconds ago",now-startPolling,now-stopPolling);
10121016
}
1013-
MTM_LOG1("epoll started %ld and finished %ld microseconds ago",now-startPolling,now-stopPolling);
1017+
lastHeartbeatCheck=now;
1018+
}
1019+
if (n==0&&Mtm->disabledNodeMask!=0) {
1020+
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1021+
MtmRefreshClusterStatus(false);
10141022
}
1015-
lastHeartbeatCheck=now;
1016-
}
1017-
if (n==0&&Mtm->disabledNodeMask!=0) {
1018-
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1019-
MtmRefreshClusterStatus(false);
10201023
}
10211024
}
10221025
proc_exit(1);/* force restart of this bgwroker */

‎multimaster.c

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,6 @@ MtmCreateTransState(MtmCurrentTrans* x)
742742
/* I am coordinator of transaction */
743743
ts->gtid.xid=x->xid;
744744
ts->gtid.node=MtmNodeId;
745-
//ts->gid[0] = '\0';
746745
strcpy(ts->gid,x->gid);
747746
}
748747
returnts;
@@ -1186,26 +1185,33 @@ static void MtmDisableNode(int nodeId)
11861185
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
11871186
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
11881187
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
1189-
Mtm->nLiveNodes-=1;
1190-
}
1188+
if (nodeId!=MtmNodeId) {
1189+
Mtm->nLiveNodes-=1;
1190+
}
1191+
elog(WARNING,"Disable node %d at xlog position %lx",nodeId,GetXLogInsertRecPtr());
1192+
}
11911193

11921194
staticvoidMtmEnableNode(intnodeId)
11931195
{
11941196
BIT_CLEAR(Mtm->disabledNodeMask,nodeId-1);
11951197
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
11961198
Mtm->nodes[nodeId-1].lastStatusChangeTime=MtmGetSystemTime();
11971199
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
1198-
Mtm->nLiveNodes+=1;
1200+
if (nodeId!=MtmNodeId) {
1201+
Mtm->nLiveNodes+=1;
1202+
}
1203+
elog(WARNING,"Enable node %d at xlog position %lx",nodeId,GetXLogInsertRecPtr());
11991204
}
12001205

12011206
voidMtmRecoveryCompleted(void)
12021207
{
1203-
MTM_LOG1("Recovery of node %d is completed",MtmNodeId);
1208+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, reconnect mask=%ld, live nodes=%d",
1209+
MtmNodeId,Mtm->disabledNodeMask,Mtm->reconnectMask,Mtm->nLiveNodes);
12041210
MtmLock(LW_EXCLUSIVE);
12051211
Mtm->recoverySlot=0;
1206-
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
12071212
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime=MtmGetSystemTime();
1208-
/* Mode will be changed to online once all locagical reciever are connected */
1213+
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
1214+
/* Mode will be changed to online once all logical reciever are connected */
12091215
MtmSwitchClusterMode(MTM_CONNECTED);
12101216
MtmUnlock();
12111217
}
@@ -1464,16 +1470,16 @@ bool MtmRefreshClusterStatus(bool nowait)
14641470
MtmEnableNode(i+1);
14651471
}
14661472
}
1467-
#endif
14681473
Mtm->reconnectMask |=clique&Mtm->disabledNodeMask;/* new enabled nodes mask */
1474+
#endif
14691475

14701476
if (disabled) {
14711477
MtmCheckQuorum();
14721478
}
14731479
/* Interrupt voting for active transaction and abort them */
14741480
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
14751481
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1476-
ts->gid,ts->gtid.node,ts->xid,ts->status,ts->gtid.xid);
1482+
ts->gid,ts->gtid.nхode,ts->xid,ts->status,ts->gtid.xid);
14771483
if (MtmIsCoordinator(ts)) {
14781484
if (!ts->votingCompleted&&disabled!=0&&ts->status!=TRANSACTION_STATUS_ABORTED) {
14791485
MtmAbortTransaction(ts);
@@ -1728,6 +1734,7 @@ static void MtmInitialize()
17281734
Mtm->transCount=0;
17291735
Mtm->gcCount=0;
17301736
Mtm->nConfigChanges=0;
1737+
Mtm->recoveryCount=0;
17311738
Mtm->localTablesHashLoaded= false;
17321739
Mtm->inject2PCError=0;
17331740
for (i=0;i<MtmNodes;i++) {
@@ -2271,6 +2278,9 @@ void MtmReceiverStarted(int nodeId)
22712278
MtmLock(LW_EXCLUSIVE);
22722279
if (!BIT_CHECK(Mtm->pglogicalNodeMask,nodeId-1)) {
22732280
BIT_SET(Mtm->pglogicalNodeMask,nodeId-1);
2281+
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
2282+
MtmEnableNode(nodeId);
2283+
}
22742284
if (++Mtm->nReceivers==Mtm->nLiveNodes-1) {
22752285
if (Mtm->status==MTM_CONNECTED) {
22762286
MtmSwitchClusterMode(MTM_ONLINE);
@@ -2296,6 +2306,9 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
22962306
/* Choose for recovery first available slot */
22972307
MTM_LOG1("Start recovery from node %d",nodeId);
22982308
Mtm->recoverySlot=nodeId;
2309+
Mtm->nReceivers=0;
2310+
Mtm->recoveryCount+=1;
2311+
Mtm->pglogicalNodeMask=0;
22992312
FinishAllPreparedTransactions(false);
23002313
returnSLOT_OPEN_EXISTED;
23012314
}

‎multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ typedef struct
174174
LWLockPadded*locks;/* multimaster lock tranche */
175175
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
176176
nodemask_tdisabledNodeMask;/* bitmask of disabled nodes */
177-
nodemask_tconnectivityMask;/* bitmask ofdicconnected nodes */
177+
nodemask_tconnectivityMask;/* bitmask ofdisconnected nodes */
178178
nodemask_tpglogicalNodeMask;/* bitmask of started pglogic receivers */
179179
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
180180
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
@@ -188,6 +188,7 @@ typedef struct
188188
intnLockers;/* Number of lockers */
189189
intnActiveTransactions;/* Nunmber of active 2PC transactions */
190190
intnConfigChanges;/* Number of cluster configuration changes */
191+
intrecoveryCount;/* Number of completed recoveries */
191192
int64timeShift;/* Local time correction */
192193
csn_tcsn;/* Last obtained timestamp: used to provide unique acending CSNs based on system time */
193194
csn_tlastCsn;/* CSN of last committed transaction */

‎pglogical_apply.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ static void
492492
MtmEndSession(void)
493493
{
494494
if (replorigin_session_origin!=InvalidRepOriginId) {
495-
MTM_LOG3("%d: Begin reset replorigin session: %d",MyProcPid,replorigin_session_origin);
495+
MTM_LOG2("%d: Begin reset replorigin session for node %d: %d, progress %lx",MyProcPid,MtmReplicationNodeId,replorigin_session_origin,replorigin_session_get_progress(false));
496496
replorigin_session_origin=InvalidRepOriginId;
497497
replorigin_session_reset();
498498
if (unlock) {
@@ -568,7 +568,7 @@ process_remote_commit(StringInfo in)
568568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569569
csn=pq_getmsgint64(in);
570570
gid=pq_getmsgstring(in);
571-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s",csn,gid);
571+
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld",csn,gid,end_lsn);
572572
StartTransactionCommand();
573573
MtmBeginSession();
574574
MtmSetCurrentTransactionCSN(csn);
@@ -585,6 +585,7 @@ process_remote_commit(StringInfo in)
585585
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
586586
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
587587
StartTransactionCommand();
588+
MtmBeginSession();
588589
MtmSetCurrentTransactionGID(gid);
589590
FinishPreparedTransaction(gid, false);
590591
CommitTransactionCommand();
@@ -594,6 +595,12 @@ process_remote_commit(StringInfo in)
594595
default:
595596
Assert(false);
596597
}
598+
#if0/* Do ont need to advance slot position here: it will be done by transaction commit */
599+
if (replorigin_session_origin!=InvalidRepOriginId) {
600+
replorigin_advance(replorigin_session_origin,end_lsn,
601+
XactLastCommitEnd, false, false);
602+
}
603+
#endif
597604
MtmEndSession(true);
598605
MtmUpdateLsnMapping(MtmReplicationNodeId,end_lsn);
599606
if (flags&PGLOGICAL_CAUGHT_UP) {
@@ -936,6 +943,11 @@ void MtmExecutor(int id, void* work, size_t size)
936943
while (true) {
937944
charaction=pq_getmsgbyte(&s);
938945
MTM_LOG3("%d: REMOTE process action %c",MyProcPid,action);
946+
#if0
947+
if (Mtm->status==MTM_RECOVERY) {
948+
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);
949+
}
950+
#endif
939951
switch (action) {
940952
/* BEGIN */
941953
case'B':

‎pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,8 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
424424
PGLogicalProtoAPI*
425425
pglogical_init_api(PGLogicalProtoTypetyp)
426426
{
427-
PGLogicalProtoAPI*res=palloc0(sizeof(PGLogicalProtoAPI));
427+
PGLogicalProtoAPI*res=malloc(sizeof(PGLogicalProtoAPI));
428+
MemSet(res,0,sizeof(PGLogicalProtoAPI));
428429
sscanf(MyReplicationSlot->data.name.data,MULTIMASTER_SLOT_PATTERN,&MtmReplicationNodeId);
429430
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d",MyProcPid,MyReplicationSlot->data.name.data,MtmReplicationNodeId);
430431
res->write_rel=pglogical_write_rel;

‎pglogical_receiver.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,15 @@ pglogical_receiver_main(Datum main_arg)
244244
*/
245245
while (!got_sigterm)
246246
{
247+
intcount;
248+
247249
/*
248250
* Determine when and how we should open replication slot.
249251
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
250252
* Slots at other nodes should be removed
251253
*/
252254
mode=MtmReceiverSlotMode(nodeId);
255+
count=Mtm->recoveryCount;
253256

254257
/* Establish connection to remote server */
255258
conn=PQconnectdb(connString);
@@ -303,7 +306,7 @@ pglogical_receiver_main(Datum main_arg)
303306
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
304307
}else {
305308
originStartPos=replorigin_get_progress(originId, false);
306-
MTM_LOG1("Restart logical receiver at position %lx from node %d",originStartPos,nodeId);
309+
MTM_LOG1("Restart logical receiver at position %lxwith origin=%dfrom node %d",originStartPos,originId,nodeId);
307310
}
308311
CommitTransactionCommand();
309312

@@ -359,7 +362,12 @@ pglogical_receiver_main(Datum main_arg)
359362

360363
if (Mtm->status==MTM_OFFLINE|| (Mtm->status==MTM_RECOVERY&&Mtm->recoverySlot!=nodeId))
361364
{
362-
ereport(LOG, (errmsg("%s: suspending WAL receiver because node was switched to %s mode",worker_proc,MtmNodeStatusMnem[Mtm->status])));
365+
ereport(LOG, (errmsg("%s: restart WAL receiver because node was switched to %s mode",worker_proc,MtmNodeStatusMnem[Mtm->status])));
366+
break;
367+
}
368+
if (count!=Mtm->recoveryCount) {
369+
370+
ereport(LOG, (errmsg("%s: restart WAL receiver because node was recovered",worker_proc)));
363371
break;
364372
}
365373

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp